# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ast
import collections
import functools
import json
import operator
import os
import re
import sys
import time
from typing import Any

import requests
from compare_test_runs import compare_job_sets
from get_ci_error_statistics import get_jobs
from get_previous_daily_ci import get_last_daily_ci_reports, get_last_daily_ci_run, get_last_daily_ci_workflow_run_id
from huggingface_hub import HfApi
from slack_sdk import WebClient


# A map associating the job names (specified by `inputs.job` in a workflow file) with the keys of
# `additional_files`.
job_to_test_map = {
    "run_models_gpu": "Models",
    "run_trainer_and_fsdp_gpu": "Trainer & FSDP",
    "run_pipelines_torch_gpu": "PyTorch pipelines",
    "run_examples_gpu": "Examples directory",
    "run_torch_cuda_extensions_gpu": "DeepSpeed",
    "run_quantization_torch_gpu": "Quantization",
    "run_kernels_gpu": "Kernels",
}

# The values are used as the file names where to save the corresponding CI job results.
test_to_result_name = {
    "Models": "model",
    "Trainer & FSDP": "trainer_and_fsdp",
    "PyTorch pipelines": "torch_pipeline",
    "Examples directory": "example",
    "DeepSpeed": "deepspeed",
    "Quantization": "quantization",
    "Kernels": "kernels",
}

NON_MODEL_TEST_MODULES = [
    "deepspeed",
    "extended",
    "fixtures",
    "generation",
    "onnx",
    "optimization",
    "pipelines",
    "sagemaker",
    "trainer",
    "utils",
    "fsdp",
    "quantization",
    "kernels",
]


def handle_test_results(test_results):
    expressions = test_results.split(" ")

    failed = 0
    success = 0
    errors = 0
    skipped = 0

    # When the output is short enough, the output is surrounded by = signs: "== OUTPUT =="
    # When it is too long, those signs are not present.
    # It could be `'71.60s', '(0:01:11)', '====\n'` or `'in', '35.01s', '================\n'`.
    # Let always select the one with `s`.
    time_spent = expressions[-1]
    if "=" in time_spent:
        time_spent = expressions[-2]
    if "(" in time_spent:
        time_spent = expressions[-3]

    for i, expression in enumerate(expressions):
        if "failed" in expression:
            failed += int(expressions[i - 1])
        if "errors" in expression:
            errors += int(expressions[i - 1])
        if "passed" in expression:
            success += int(expressions[i - 1])
        if "skipped" in expression:
            skipped += int(expressions[i - 1])

    return failed, errors, success, skipped, time_spent


def handle_stacktraces(test_results):
    # These files should follow the following architecture:
    # === FAILURES ===
    # <path>:<line>: Error ...
    # <path>:<line>: Error ...
    # <empty line>

    total_stacktraces = test_results.split("\n")[1:-1]
    stacktraces = []
    for stacktrace in total_stacktraces:
        try:
            line = stacktrace[: stacktrace.index(" ")].split(":")[-2]
            error_message = stacktrace[stacktrace.index(" ") :]

            stacktraces.append(f"(line {line}) {error_message}")
        except Exception:
            stacktraces.append("Cannot retrieve error message.")

    return stacktraces


def dicts_to_sum(objects: dict[str, dict] | list[dict]):
    if isinstance(objects, dict):
        lists = objects.values()
    else:
        lists = objects

    # Convert each dictionary to counter
    counters = map(collections.Counter, lists)
    # Sum all the counters
    return functools.reduce(operator.add, counters)


class Message:
    def __init__(
        self,
        title: str,
        ci_title: str,
        model_results: dict,
        additional_results: dict,
        selected_warnings: list | None = None,
        prev_ci_artifacts=None,
        other_ci_artifacts=None,
    ):
        self.title = title
        self.ci_title = ci_title

        # Failures and success of the modeling tests
        self.n_model_success = sum(r["success"] for r in model_results.values())
        self.n_model_single_gpu_failures = sum(dicts_to_sum(r["failed"])["single"] for r in model_results.values())
        self.n_model_multi_gpu_failures = sum(dicts_to_sum(r["failed"])["multi"] for r in model_results.values())

        # Some suites do not have a distinction between single and multi GPU.
        self.n_model_unknown_failures = sum(dicts_to_sum(r["failed"])["unclassified"] for r in model_results.values())
        self.n_model_failures = (
            self.n_model_single_gpu_failures + self.n_model_multi_gpu_failures + self.n_model_unknown_failures
        )
        self.n_model_jobs_errored_out = sum(r["error"] for r in model_results.values())

        # Failures and success of the additional tests
        self.n_additional_success = sum(r["success"] for r in additional_results.values())
        self.n_additional_jobs_errored_out = sum(r["error"] for r in additional_results.values())

        if len(additional_results) > 0:
            # `dicts_to_sum` uses `dicts_to_sum` which requires a non empty dictionary. Let's just add an empty entry.
            all_additional_failures = dicts_to_sum([r["failed"] for r in additional_results.values()])
            self.n_additional_single_gpu_failures = all_additional_failures["single"]
            self.n_additional_multi_gpu_failures = all_additional_failures["multi"]
            self.n_additional_unknown_gpu_failures = all_additional_failures["unclassified"]
        else:
            self.n_additional_single_gpu_failures = 0
            self.n_additional_multi_gpu_failures = 0
            self.n_additional_unknown_gpu_failures = 0

        self.n_additional_failures = (
            self.n_additional_single_gpu_failures
            + self.n_additional_multi_gpu_failures
            + self.n_additional_unknown_gpu_failures
        )

        # Results
        self.n_failures = self.n_model_failures + self.n_additional_failures
        self.n_success = self.n_model_success + self.n_additional_success
        self.n_tests = self.n_failures + self.n_success
        self.n_jobs_errored_out = self.n_model_jobs_errored_out + self.n_additional_jobs_errored_out

        self.model_results = model_results
        self.additional_results = additional_results

        self.thread_ts = None

        if selected_warnings is None:
            selected_warnings = []
        self.selected_warnings = selected_warnings

        self.prev_ci_artifacts = prev_ci_artifacts
        self.other_ci_artifacts = other_ci_artifacts

    @property
    def time(self) -> str:
        all_results = [*self.model_results.values(), *self.additional_results.values()]

        time_spent = []
        for r in all_results:
            if len(r["time_spent"]):
                time_spent.extend(r["time_spent"])
        total_secs = sum(time_spent)

        hours, minutes, seconds = total_secs // 3600, (total_secs % 3600) // 60, total_secs % 60
        return f"{int(hours)}h{int(minutes)}m{int(seconds)}s"

    @property
    def header(self) -> dict:
        return {"type": "header", "text": {"type": "plain_text", "text": self.title}}

    @property
    def ci_title_section(self) -> dict:
        return {"type": "section", "text": {"type": "mrkdwn", "text": self.ci_title}}

    @property
    def no_failures(self) -> dict:
        return {
            "type": "section",
            "text": {
                "type": "plain_text",
                "text": f"[SUCCESS] There were no failures: all {self.n_tests} tests passed. The suite ran in {self.time}.",
                "emoji": True,
            },
            "accessory": {
                "type": "button",
                "text": {"type": "plain_text", "text": "Check Action results", "emoji": True},
                "url": f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}",
            },
        }

    @property
    def failures(self) -> dict:
        return {
            "type": "section",
            "text": {
                "type": "plain_text",
                "text": (
                    f"There were {self.n_failures} failures, out of {self.n_tests} tests.\n"
                    f"[ERROR] There were {self.n_jobs_errored_out} jobs errored out (not producing test output files).\n"
                    f"The suite ran in {self.time}."
                ),
                "emoji": True,
            },
            "accessory": {
                "type": "button",
                "text": {"type": "plain_text", "text": "Check Action results", "emoji": True},
                "url": f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}",
            },
        }

    @property
    def warnings(self) -> dict:
        # If something goes wrong, let's avoid the CI report failing to be sent.
        button_text = "Check warnings (Link not found)"
        # Use the workflow run link
        job_link = f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}"

        for job in github_actions_jobs:
            if "Extract warnings in CI artifacts" in job["name"] and job["conclusion"] == "success":
                button_text = "Check warnings"
                # Use the actual job link
                job_link = job["html_url"]
                break

        huggingface_hub_warnings = [x for x in self.selected_warnings if "huggingface_hub" in x]
        text = f"There are {len(self.selected_warnings)} warnings being selected."
        text += f"\n{len(huggingface_hub_warnings)} of them are from `huggingface_hub`."

        return {
            "type": "section",
            "text": {
                "type": "plain_text",
                "text": text,
                "emoji": True,
            },
            "accessory": {
                "type": "button",
                "text": {"type": "plain_text", "text": button_text, "emoji": True},
                "url": job_link,
            },
        }

    @staticmethod
    def get_device_report(report, rjust=6):
        if "single" in report and "multi" in report:
            return f"{str(report['single']).rjust(rjust)} | {str(report['multi']).rjust(rjust)} | "
        elif "single" in report:
            return f"{str(report['single']).rjust(rjust)} | {'0'.rjust(rjust)} | "
        elif "multi" in report:
            return f"{'0'.rjust(rjust)} | {str(report['multi']).rjust(rjust)} | "

    @property
    def category_failures(self) -> dict:
        if job_name != "run_models_gpu":
            category_failures_report = ""
            return {"type": "section", "text": {"type": "mrkdwn", "text": category_failures_report}}

        model_failures = [v["failed"] for v in self.model_results.values()]

        category_failures = {}

        for model_failure in model_failures:
            for key, value in model_failure.items():
                if key not in category_failures:
                    category_failures[key] = dict(value)
                else:
                    category_failures[key]["unclassified"] += value["unclassified"]
                    category_failures[key]["single"] += value["single"]
                    category_failures[key]["multi"] += value["multi"]

        individual_reports = []
        for key, value in category_failures.items():
            device_report = self.get_device_report(value)

            if sum(value.values()):
                if device_report:
                    individual_reports.append(f"{device_report}{key}")
                else:
                    individual_reports.append(key)

        header = "Single |  Multi | Category\n"
        category_failures_report = prepare_reports(
            title="The following categories had failures", header=header, reports=individual_reports
        )

        return {"type": "section", "text": {"type": "mrkdwn", "text": category_failures_report}}

    def compute_diff_for_failure_reports(self, curr_failure_report, prev_failure_report):  # noqa
        # Remove the leading and training parts that don't contain failure count information.
        model_failures = curr_failure_report.split("\n")[3:-2]
        prev_model_failures = prev_failure_report.split("\n")[3:-2]
        entries_changed = set(model_failures).difference(prev_model_failures)

        prev_map = {}
        for f in prev_model_failures:
            items = [x.strip() for x in f.split("| ")]
            prev_map[items[-1]] = [int(x) for x in items[:-1]]

        curr_map = {}
        for f in entries_changed:
            items = [x.strip() for x in f.split("| ")]
            curr_map[items[-1]] = [int(x) for x in items[:-1]]

        diff_map = {}
        for k, v in curr_map.items():
            if k not in prev_map:
                diff_map[k] = v
            else:
                diff = [x - y for x, y in zip(v, prev_map[k])]
                if max(diff) > 0:
                    diff_map[k] = diff

        entries_changed = []
        for model_name, diff_values in diff_map.items():
            diff = [str(x) for x in diff_values]
            diff = [f"+{x}" if (x != "0" and not x.startswith("-")) else x for x in diff]
            diff = [x.rjust(9) for x in diff]
            device_report = " | ".join(diff) + " | "
            report = f"{device_report}{model_name}"
            entries_changed.append(report)
        entries_changed = sorted(entries_changed, key=lambda s: s.split("| ")[-1])

        return entries_changed

    @property
    def model_failures(self) -> list[dict]:
        # Obtain per-model failures
        def per_model_sum(model_category_dict):
            return dicts_to_sum(model_category_dict["failed"].values())

        failures = {}
        non_model_failures = {
            k: per_model_sum(v) for k, v in self.model_results.items() if sum(per_model_sum(v).values())
        }

        for k, v in self.model_results.items():
            # The keys in `model_results` may contain things like `models_vit` or `quantization_autoawq`
            # Remove the prefix to make the report cleaner.
            k = k.replace("models_", "").replace("quantization_", "")
            if k in NON_MODEL_TEST_MODULES:
                continue

            if sum(per_model_sum(v).values()):
                dict_failed = dict(v["failed"])

                # Model job has a special form for reporting
                if job_name == "run_models_gpu":
                    pytorch_specific_failures = dict_failed.pop("PyTorch")
                    other_failures = dicts_to_sum(dict_failed.values())

                    failures[k] = {
                        "PyTorch": pytorch_specific_failures,
                        "other": other_failures,
                    }

                else:
                    test_name = job_to_test_map[job_name]
                    specific_failures = dict_failed.pop(test_name)
                    failures[k] = {
                        test_name: specific_failures,
                    }

        model_reports = []
        other_module_reports = []

        for key, value in non_model_failures.items():
            key = key.replace("models_", "").replace("quantization_", "")

            if key in NON_MODEL_TEST_MODULES:
                device_report = self.get_device_report(value)

                if sum(value.values()):
                    if device_report:
                        report = f"{device_report}{key}"
                    else:
                        report = key

                    other_module_reports.append(report)

        for key, value in failures.items():
            # Model job has a special form for reporting
            if job_name == "run_models_gpu":
                device_report_values = [
                    value["PyTorch"]["single"],
                    value["PyTorch"]["multi"],
                    sum(value["other"].values()),
                ]

            else:
                test_name = job_to_test_map[job_name]
                device_report_values = [
                    value[test_name]["single"],
                    value[test_name]["multi"],
                ]

            if sum(device_report_values):
                # This is related to `model_header` below
                rjust_width = 9 if job_name == "run_models_gpu" else 6
                device_report = " | ".join([str(x).rjust(rjust_width) for x in device_report_values]) + " | "
                report = f"{device_report}{key}"

                model_reports.append(report)

        # (Possibly truncated) reports for the current workflow run - to be sent to Slack channels
        if job_name == "run_models_gpu":
            model_header = "Single PT |  Multi PT |     Other | Category\n"
        else:
            model_header = "Single |  Multi | Category\n"

        # Used when calling `prepare_reports` below to prepare the `title` argument
        label = test_to_result_name[job_to_test_map[job_name]]

        sorted_model_reports = sorted(model_reports, key=lambda s: s.split("| ")[-1])
        model_failures_report = prepare_reports(
            title=f"These following {label} modules had failures", header=model_header, reports=sorted_model_reports
        )

        module_header = "Single |  Multi | Category\n"
        sorted_module_reports = sorted(other_module_reports, key=lambda s: s.split("| ")[-1])
        module_failures_report = prepare_reports(
            title=f"The following {label} modules had failures", header=module_header, reports=sorted_module_reports
        )

        # To be sent to Slack channels
        model_failure_sections = [{"type": "section", "text": {"type": "mrkdwn", "text": model_failures_report}}]
        model_failure_sections.append({"type": "section", "text": {"type": "mrkdwn", "text": module_failures_report}})

        # Save the complete (i.e. no truncation) failure tables (of the current workflow run)
        # (to be uploaded as artifacts)

        model_failures_report = prepare_reports(
            title=f"These following {label} modules had failures",
            header=model_header,
            reports=sorted_model_reports,
            to_truncate=False,
        )
        file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/model_failures_report.txt")
        with open(file_path, "w", encoding="UTF-8") as fp:
            fp.write(model_failures_report)

        module_failures_report = prepare_reports(
            title=f"The following {label} modules had failures",
            header=module_header,
            reports=sorted_module_reports,
            to_truncate=False,
        )
        file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/module_failures_report.txt")
        with open(file_path, "w", encoding="UTF-8") as fp:
            fp.write(module_failures_report)

        if self.prev_ci_artifacts is not None:
            # if the last run produces artifact named `ci_results_{job_name}`
            if (
                f"ci_results_{job_name}" in self.prev_ci_artifacts
                and "model_failures_report.txt" in self.prev_ci_artifacts[f"ci_results_{job_name}"]
            ):
                # Compute the difference of the previous/current (model failure) table
                prev_model_failures = self.prev_ci_artifacts[f"ci_results_{job_name}"]["model_failures_report.txt"]
                entries_changed = self.compute_diff_for_failure_reports(model_failures_report, prev_model_failures)
                if len(entries_changed) > 0:
                    # Save the complete difference
                    diff_report = prepare_reports(
                        title="Changed model modules failures",
                        header=model_header,
                        reports=entries_changed,
                        to_truncate=False,
                    )
                    file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/changed_model_failures_report.txt")
                    with open(file_path, "w", encoding="UTF-8") as fp:
                        fp.write(diff_report)

                    # To be sent to Slack channels
                    diff_report = prepare_reports(
                        title="*Changed model modules failures*",
                        header=model_header,
                        reports=entries_changed,
                    )
                    model_failure_sections.append(
                        {"type": "section", "text": {"type": "mrkdwn", "text": diff_report}},
                    )

        return model_failure_sections

    @property
    def additional_failures(self) -> dict:
        failures = {k: v["failed"] for k, v in self.additional_results.items()}
        errors = {k: v["error"] for k, v in self.additional_results.items()}

        individual_reports = []
        for key, value in failures.items():
            device_report = self.get_device_report(value)

            if sum(value.values()) or errors[key]:
                report = f"{key}"
                if errors[key]:
                    report = f"[Errored out] {report}"
                if device_report:
                    report = f"{device_report}{report}"

                individual_reports.append(report)

        header = "Single |  Multi | Category\n"
        failures_report = prepare_reports(
            title="The following non-modeling tests had failures", header=header, reports=individual_reports
        )

        return {"type": "section", "text": {"type": "mrkdwn", "text": failures_report}}

    @property
    def payload(self) -> str:
        blocks = [self.header]

        if self.ci_title:
            blocks.append(self.ci_title_section)

        if self.n_model_failures > 0 or self.n_additional_failures > 0 or self.n_jobs_errored_out > 0:
            blocks.append(self.failures)

        if self.n_model_failures > 0:
            block = self.category_failures
            if block["text"]["text"]:
                blocks.append(block)

            for block in self.model_failures:
                if block["text"]["text"]:
                    blocks.append(block)

        if self.n_additional_failures > 0:
            blocks.append(self.additional_failures)

        if self.n_model_failures == 0 and self.n_additional_failures == 0:
            blocks.append(self.no_failures)

        if len(self.selected_warnings) > 0:
            blocks.append(self.warnings)

        new_failure_blocks = []
        for idx, (prev_workflow_run_id, prev_ci_artifacts) in enumerate(
            [self.prev_ci_artifacts] + self.other_ci_artifacts
        ):
            if idx == 0:
                # This is the truncated version to show on slack. For now.
                new_failure_blocks = self.get_new_model_failure_blocks(
                    prev_ci_artifacts=prev_ci_artifacts, with_header=False
                )

            # To save the list of new model failures and uploaed to hub repositories
            extra_blocks = self.get_new_model_failure_blocks(prev_ci_artifacts=prev_ci_artifacts, to_truncate=False)
            if extra_blocks:
                filename = "new_failures"
                if idx > 0:
                    filename = f"{filename}_against_{prev_workflow_run_id}"

                failure_text = extra_blocks[-1]["text"]["text"]
                file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.txt")
                with open(file_path, "w", encoding="UTF-8") as fp:
                    fp.write(failure_text)

                # upload results to Hub dataset
                file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.txt")
                _ = api.upload_file(
                    path_or_fileobj=file_path,
                    path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{filename}.txt",
                    repo_id=report_repo_id,
                    repo_type="dataset",
                    token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
                )

                # extra processing to save to json format
                new_failed_tests = {}
                nb_new_failed_tests = 0
                for line in failure_text.split():
                    if "https://github.com/huggingface/transformers/actions/runs" in line:
                        pattern = r"<(https://github.com/huggingface/transformers/actions/runs/.+?/job/.+?)\|(.+?)>"
                        items = re.findall(pattern, line)
                    elif "tests/" in line:
                        # TODO: Improve the condition here.
                        if "tests/models/" in line or (
                            "tests/quantization/" in line and job_name == "run_quantization_torch_gpu"
                        ):
                            model = line.split("/")[2]
                        else:
                            model = line.split("/")[1]
                        if model not in new_failed_tests:
                            new_failed_tests[model] = {"single-gpu": [], "multi-gpu": []}
                        for _, device in items:
                            new_failed_tests[model][f"{device}-gpu"].append(line)
                            nb_new_failed_tests += 1
                file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.json")
                with open(file_path, "w", encoding="UTF-8") as fp:
                    json.dump(new_failed_tests, fp, ensure_ascii=False, indent=4)

                # upload results to Hub dataset
                file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.json")
                commit_info = api.upload_file(
                    path_or_fileobj=file_path,
                    path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{filename}.json",
                    repo_id=report_repo_id,
                    repo_type="dataset",
                    token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
                )
                new_failures_url = f"https://huggingface.co/datasets/{report_repo_id}/raw/{commit_info.oid}/{report_repo_folder}/ci_results_{job_name}/{filename}.json"

                if idx == 0:
                    block = {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"*There are {nb_new_failed_tests} new failed tests*\n\n(compared to previous run: <https://github.com/huggingface/transformers/actions/runs/{prev_workflow_run_id}|{prev_workflow_run_id}>)",
                        },
                        "accessory": {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "Check new failures"},
                            "url": new_failures_url,
                        },
                    }
                    blocks.append(block)
                else:
                    block = {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            # TODO: We should NOT assume it's always Nvidia CI, but it's the case at this moment.
                            "text": f"*There are {nb_new_failed_tests} failed tests unique to this run*\n\n(compared to{' Nvidia CI ' if is_scheduled_ci_run else ' '}run: <https://github.com/huggingface/transformers/actions/runs/{prev_workflow_run_id}|{prev_workflow_run_id}>)",
                        },
                        "accessory": {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "Check failures"},
                            "url": new_failures_url,
                        },
                    }
                    blocks.append(block)

        if diff_file_url is not None:
            block = {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Test results diff*\n\n(compared to previous run: <https://github.com/huggingface/transformers/actions/runs/{prev_workflow_run_id}|{prev_workflow_run_id}>)",
                },
                "accessory": {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "Check test result diff file"},
                    "url": diff_file_url,
                },
            }
            blocks.append(block)

        if len(new_failure_blocks) > 0:
            blocks.extend(new_failure_blocks)

        return json.dumps(blocks)

    @staticmethod
    def error_out(title, ci_title="", runner_not_available=False, runner_failed=False, setup_failed=False):
        blocks = []
        title_block = {"type": "header", "text": {"type": "plain_text", "text": title}}
        blocks.append(title_block)

        if ci_title:
            ci_title_block = {"type": "section", "text": {"type": "mrkdwn", "text": ci_title}}
            blocks.append(ci_title_block)

        offline_runners = []
        if runner_not_available:
            text = "[FAIL] CI runners are not available! Tests are not run."
            result = os.environ.get("OFFLINE_RUNNERS")
            if result is not None:
                offline_runners = json.loads(result)
        elif runner_failed:
            text = "[FAIL] CI runners have problems! Tests are not run."
        elif setup_failed:
            text = "[FAIL] Setup job failed. Tests are not run."
        else:
            text = "[FAIL] There was an issue running the tests."

        error_block_1 = {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": text,
            },
        }

        text = ""
        if len(offline_runners) > 0:
            text = "\n  • " + "\n  • ".join(offline_runners)
            text = f"The following runners are offline:\n{text}\n\n"
        text += "Let's fix it ASAP!"

        error_block_2 = {
            "type": "section",
            "text": {
                "type": "plain_text",
                "text": text,
            },
            "accessory": {
                "type": "button",
                "text": {"type": "plain_text", "text": "Check Action results", "emoji": True},
                "url": f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}",
            },
        }
        blocks.extend([error_block_1, error_block_2])

        payload = json.dumps(blocks)

        print("Sending the following payload")
        print(json.dumps({"blocks": blocks}))

        client.chat_postMessage(
            channel=SLACK_REPORT_CHANNEL_ID,
            text=text,
            blocks=payload,
        )

    def post(self):
        payload = self.payload
        print("Sending the following payload")
        print(json.dumps({"blocks": json.loads(payload)}))

        text = f"{self.n_failures} failures out of {self.n_tests} tests," if self.n_failures else "All tests passed."

        self.thread_ts = client.chat_postMessage(
            channel=SLACK_REPORT_CHANNEL_ID,
            blocks=payload,
            text=text,
        )

    def get_reply_blocks(self, job_name, job_result, failures, device, text):
        """
        failures: A list with elements of the form {"line": full test name, "trace": error trace}
        """
        # `text` must be less than 3001 characters in Slack SDK
        # keep some room for adding "[Truncated]" when necessary
        MAX_ERROR_TEXT = 3000 - len("[Truncated]")

        failure_text = ""
        for idx, error in enumerate(failures):
            new_text = failure_text + f"*{error['line']}*\n_{error['trace']}_\n\n"
            if len(new_text) > MAX_ERROR_TEXT:
                # `failure_text` here has length <= 3000
                failure_text = failure_text + "[Truncated]"
                break
            # `failure_text` here has length <= MAX_ERROR_TEXT
            failure_text = new_text

        title = job_name
        if device is not None:
            title += f" ({device}-gpu)"

        content = {"type": "section", "text": {"type": "mrkdwn", "text": text}}

        # TODO: Make sure we always have a valid job link (or at least a way not to break the report sending)
        # Currently we get the device from a job's artifact name.
        # If a device is found, the job name should contain the device type, for example, `XXX (single-gpu)`.
        # This could be done by adding `machine_type` in a job's `strategy`.
        # (If `job_result["job_link"][device]` is `None`, we get an error: `... [ERROR] must provide a string ...`)
        if job_result["job_link"] is not None and job_result["job_link"][device] is not None:
            content["accessory"] = {
                "type": "button",
                "text": {"type": "plain_text", "text": "GitHub Action job", "emoji": True},
                "url": job_result["job_link"][device],
            }

        return [
            {"type": "header", "text": {"type": "plain_text", "text": title.upper(), "emoji": True}},
            content,
            {"type": "section", "text": {"type": "mrkdwn", "text": failure_text}},
        ]

    def get_new_model_failure_blocks(self, prev_ci_artifacts, with_header=True, to_truncate=True):
        if prev_ci_artifacts is None:
            return []

        if len(self.model_results) > 0:
            target_results = self.model_results
        else:
            target_results = self.additional_results[job_to_test_map[job_name]]

        # Make the format uniform between `model_results` and `additional_results[XXX]`
        if "failures" in target_results:
            target_results = {job_name: target_results}
        sorted_dict = sorted(target_results.items(), key=lambda t: t[0])

        job = job_to_test_map[job_name]
        prev_model_results = {}
        if (
            f"ci_results_{job_name}" in prev_ci_artifacts
            and f"{test_to_result_name[job]}_results.json" in prev_ci_artifacts[f"ci_results_{job_name}"]
        ):
            prev_model_results = json.loads(
                prev_ci_artifacts[f"ci_results_{job_name}"][f"{test_to_result_name[job]}_results.json"]
            )
            # Make the format uniform between `model_results` and `additional_results[XXX]`
            if "failures" in prev_model_results:
                prev_model_results = {job_name: prev_model_results}

        all_failure_lines = {}
        for job, job_result in sorted_dict:
            if len(job_result["failures"]):
                devices = sorted(job_result["failures"].keys(), reverse=True)
                for device in devices:
                    failures = job_result["failures"][device]
                    prev_error_lines = {}
                    if job in prev_model_results and device in prev_model_results[job]["failures"]:
                        prev_error_lines = {error["line"] for error in prev_model_results[job]["failures"][device]}

                    url = None
                    if job_result["job_link"] is not None and job_result["job_link"][device] is not None:
                        url = job_result["job_link"][device]

                    for idx, error in enumerate(failures):
                        if error["line"] in prev_error_lines:
                            continue

                        new_text = f"{error['line']}\n\n"

                        if new_text not in all_failure_lines:
                            all_failure_lines[new_text] = []

                        all_failure_lines[new_text].append(f"<{url}|{device}>" if url is not None else device)

        MAX_ERROR_TEXT = 3000 - len("[Truncated]") - len("```New failures```\n\n")
        if not to_truncate:
            MAX_ERROR_TEXT = float("inf")
        failure_text = ""
        for line, devices in all_failure_lines.items():
            new_text = failure_text + f"{'|'.join(devices)} gpu\n{line}"
            if len(new_text) > MAX_ERROR_TEXT:
                # `failure_text` here has length <= 3000
                failure_text = failure_text + "[Truncated]"
                break
            # `failure_text` here has length <= MAX_ERROR_TEXT
            failure_text = new_text

        blocks = []
        if failure_text:
            if with_header:
                blocks.append(
                    {"type": "header", "text": {"type": "plain_text", "text": "New failures", "emoji": True}}
                )
            else:
                failure_text = f"{failure_text}"
            blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": failure_text}})

        return blocks

    def post_reply(self):
        if self.thread_ts is None:
            raise ValueError("Can only post reply if a post has been made.")

        sorted_dict = sorted(self.model_results.items(), key=lambda t: t[0])
        for job, job_result in sorted_dict:
            if len(job_result["failures"]):
                for device, failures in job_result["failures"].items():
                    text = "\n".join(
                        sorted([f"*{k}*: {v[device]}" for k, v in job_result["failed"].items() if v[device]])
                    )

                    blocks = self.get_reply_blocks(job, job_result, failures, device, text=text)

                    print("Sending the following reply")
                    print(json.dumps({"blocks": blocks}))

                    client.chat_postMessage(
                        channel=SLACK_REPORT_CHANNEL_ID,
                        text=f"Results for {job}",
                        blocks=blocks,
                        thread_ts=self.thread_ts["ts"],
                    )

                    time.sleep(1)

        for job, job_result in self.additional_results.items():
            if len(job_result["failures"]):
                for device, failures in job_result["failures"].items():
                    blocks = self.get_reply_blocks(
                        job,
                        job_result,
                        failures,
                        device,
                        text=f"Number of failures: {job_result['failed'][device]}",
                    )

                    print("Sending the following reply")
                    print(json.dumps({"blocks": blocks}))

                    client.chat_postMessage(
                        channel=SLACK_REPORT_CHANNEL_ID,
                        text=f"Results for {job}",
                        blocks=blocks,
                        thread_ts=self.thread_ts["ts"],
                    )

                    time.sleep(1)


def retrieve_artifact(artifact_path: str, gpu: str | None):
    if gpu not in [None, "single", "multi"]:
        raise ValueError(f"Invalid GPU for artifact. Passed GPU: `{gpu}`.")

    _artifact = {}

    if os.path.exists(artifact_path):
        files = os.listdir(artifact_path)
        for file in files:
            try:
                with open(os.path.join(artifact_path, file)) as f:
                    _artifact[file.split(".")[0]] = f.read()
            except UnicodeDecodeError as e:
                raise ValueError(f"Could not open {os.path.join(artifact_path, file)}.") from e

    return _artifact


def retrieve_available_artifacts():
    class Artifact:
        def __init__(self, name: str, single_gpu: bool = False, multi_gpu: bool = False):
            self.name = name
            self.single_gpu = single_gpu
            self.multi_gpu = multi_gpu
            self.paths = []

        def __str__(self):
            return self.name

        def add_path(self, path: str, gpu: str | None = None):
            self.paths.append({"name": self.name, "path": path, "gpu": gpu})

    _available_artifacts: dict[str, Artifact] = {}

    directories = filter(os.path.isdir, os.listdir())
    for directory in directories:
        artifact_name = directory

        name_parts = artifact_name.split("_postfix_")
        if len(name_parts) > 1:
            artifact_name = name_parts[0]

        if artifact_name.startswith("single-gpu"):
            artifact_name = artifact_name[len("single-gpu") + 1 :]

            if artifact_name in _available_artifacts:
                _available_artifacts[artifact_name].single_gpu = True
            else:
                _available_artifacts[artifact_name] = Artifact(artifact_name, single_gpu=True)

            _available_artifacts[artifact_name].add_path(directory, gpu="single")

        elif artifact_name.startswith("multi-gpu"):
            artifact_name = artifact_name[len("multi-gpu") + 1 :]

            if artifact_name in _available_artifacts:
                _available_artifacts[artifact_name].multi_gpu = True
            else:
                _available_artifacts[artifact_name] = Artifact(artifact_name, multi_gpu=True)

            _available_artifacts[artifact_name].add_path(directory, gpu="multi")
        else:
            if artifact_name not in _available_artifacts:
                _available_artifacts[artifact_name] = Artifact(artifact_name)

            _available_artifacts[artifact_name].add_path(directory)

    return _available_artifacts


def prepare_reports(title, header, reports, to_truncate=True):
    report = ""

    MAX_ERROR_TEXT = 3000 - len("[Truncated]")
    if not to_truncate:
        MAX_ERROR_TEXT = float("inf")

    if len(reports) > 0:
        # `text` must be less than 3001 characters in Slack SDK
        # keep some room for adding "[Truncated]" when necessary

        for idx in range(len(reports)):
            _report = header + "\n".join(reports[: idx + 1])
            new_report = f"{title}:\n```\n{_report}\n```\n"
            if len(new_report) > MAX_ERROR_TEXT:
                # `report` here has length <= 3000
                report = report + "[Truncated]"
                break
            report = new_report

    return report


def pop_default(l: list[Any], i: int, default: Any) -> Any:
    try:
        return l.pop(i)
    except IndexError:
        return default


if __name__ == "__main__":
    api = HfApi()
    client = WebClient(token=os.environ["CI_SLACK_BOT_TOKEN"])

    SLACK_REPORT_CHANNEL_ID = os.environ["SLACK_REPORT_CHANNEL"]

    # runner_status = os.environ.get("RUNNER_STATUS")
    # runner_env_status = os.environ.get("RUNNER_ENV_STATUS")
    setup_status = os.environ.get("SETUP_STATUS")

    # runner_not_available = True if runner_status is not None and runner_status != "success" else False
    # runner_failed = True if runner_env_status is not None and runner_env_status != "success" else False
    # Let's keep the lines regardig runners' status (we might be able to use them again in the future)
    runner_not_available = False
    runner_failed = False
    # Some jobs don't depend (`needs`) on the job `setup`: in this case, the status of the job `setup` is `skipped`.
    setup_failed = setup_status not in ["skipped", "success"]

    org = "huggingface"
    repo = "transformers"
    repository_full_name = f"{org}/{repo}"

    # This env. variable is set in workflow file (under the job `send_results`).
    ci_event = os.environ["CI_EVENT"]

    # To find the PR number in a commit title, for example, `Add AwesomeFormer model (#99999)`
    pr_number_re = re.compile(r"\(#(\d+)\)$")

    # Add Commit/PR title with a link for push CI
    ci_title = os.environ.get("CI_TITLE", "")
    ci_sha = os.environ.get("CI_SHA")

    ci_url = None
    if ci_sha:
        ci_url = f"https://github.com/{repository_full_name}/commit/{ci_sha}"

    if ci_title:
        if ci_url is None:
            raise ValueError(
                "When a title is found (`ci_title`), it means a `push` event or a `workflow_run` even (triggered by "
                "another `push` event), and the commit SHA has to be provided in order to create the URL to the "
                "commit page."
            )
        ci_title = ci_title.strip().split("\n")[0].strip()

        # Retrieve the PR title and author login to complete the report
        commit_number = ci_url.split("/")[-1]
        ci_detail_url = f"https://api.github.com/repos/{repository_full_name}/commits/{commit_number}"
        ci_details = requests.get(ci_detail_url).json()
        ci_author = ci_details["author"]["login"]

        merged_by = None
        # Find the PR number (if any) and change the url to the actual PR page.
        numbers = pr_number_re.findall(ci_title)
        if len(numbers) > 0:
            pr_number = numbers[0]
            ci_detail_url = f"https://api.github.com/repos/{repository_full_name}/pulls/{pr_number}"
            ci_details = requests.get(ci_detail_url).json()

            ci_author = ci_details["user"]["login"]
            ci_url = f"https://github.com/{repository_full_name}/pull/{pr_number}"

            merged_by = ci_details["merged_by"]["login"]

        if merged_by is None:
            ci_title = f"<{ci_url}|{ci_title}>\nAuthor: GH_{ci_author}"
        else:
            ci_title = f"<{ci_url}|{ci_title}>\nAuthor: GH_{ci_author} | Merged by: GH_{merged_by}"

    elif ci_sha:
        ci_title = f"<{ci_url}|commit: {ci_sha}>"

    else:
        ci_title = ""

    # `title` will be updated at the end before calling `Message()`.
    title = f"[INFO] Results of {ci_event}"
    if runner_not_available or runner_failed or setup_failed:
        Message.error_out(title, ci_title, runner_not_available, runner_failed, setup_failed)
        exit(0)

    # sys.argv[0] is always `utils/notification_service.py`.
    arguments = sys.argv[1:]
    # In our usage in `.github/workflows/slack-report.yml`, we always pass an argument when calling this script.
    # The argument could be an empty string `""` if a job doesn't depend on the job `setup`.
    if arguments[0] == "":
        job_matrix = []
    else:
        job_matrix_as_str = arguments[0]
        try:
            folder_slices = ast.literal_eval(job_matrix_as_str)
            if len(folder_slices) > 0:
                if isinstance(folder_slices[0], list):
                    # Need to change from elements like `models/bert` to `models_bert` (the ones used as artifact names).
                    job_matrix = [
                        x.replace("models/", "models_").replace("quantization/", "quantization_")
                        for folders in folder_slices
                        for x in folders
                    ]
                elif isinstance(folder_slices[0], str):
                    job_matrix = [
                        x.replace("models/", "models_").replace("quantization/", "quantization_")
                        for x in folder_slices
                    ]
        except Exception:
            Message.error_out(title, ci_title)
            raise ValueError("Errored out.")

    github_actions_jobs = get_jobs(
        workflow_run_id=os.environ["GITHUB_RUN_ID"], token=os.environ["ACCESS_REPO_INFO_TOKEN"]
    )
    github_actions_job_links = {job["name"]: job["html_url"] for job in github_actions_jobs}

    artifact_name_to_job_map = {}
    for job in github_actions_jobs:
        for step in job["steps"]:
            if step["name"].startswith("Test suite reports artifacts: "):
                artifact_name = step["name"][len("Test suite reports artifacts: ") :]
                artifact_name_to_job_map[artifact_name] = job
                break

    available_artifacts = retrieve_available_artifacts()

    test_categories = [
        "PyTorch",
        "Tokenizers",
        "Pipelines",
        "Trainer",
        "ONNX",
        "Auto",
        "Quantization",
        "Unclassified",
    ]

    job_name = os.getenv("CI_TEST_JOB")
    report_name_prefix = job_name

    # This dict will contain all the information relative to each model:
    # - Failures: the total, as well as the number of failures per-category defined above
    # - Success: total
    # - Time spent: as a comma-separated list of elapsed time
    # - Failures: as a line-break separated list of errors
    matrix_job_results = {
        matrix_name: {
            "failed": {m: {"unclassified": 0, "single": 0, "multi": 0} for m in test_categories},
            "errors": 0,
            "success": 0,
            "skipped": 0,
            "time_spent": [],
            "error": False,
            "failures": {},
            "job_link": {},
            "captured_info": {},
        }
        for matrix_name in job_matrix
        if f"{report_name_prefix}_{matrix_name}_test_reports" in available_artifacts
    }

    matrix_job_results_extra = {
        matrix_name: {
            "captured_info": {},
        }
        for matrix_name in job_matrix
        if f"{report_name_prefix}_{matrix_name}_test_reports" in available_artifacts
    }

    unclassified_model_failures = []

    for matrix_name in matrix_job_results:
        for artifact_path_dict in available_artifacts[f"{report_name_prefix}_{matrix_name}_test_reports"].paths:
            path = artifact_path_dict["path"]
            artifact_gpu = artifact_path_dict["gpu"]

            if path not in artifact_name_to_job_map:
                # Mismatch between available artifacts and reported jobs on github. It happens.
                continue

            artifact = retrieve_artifact(path, artifact_gpu)

            if "summary_short" not in artifact:
                # The process might be killed (for example, CPU OOM), or the job is canceled for some reason), etc.
                matrix_job_results[matrix_name]["error"] = True

            if "stats" in artifact:
                # Link to the GitHub Action job
                job = artifact_name_to_job_map[path]
                matrix_job_results[matrix_name]["job_link"][artifact_gpu] = job["html_url"]
                failed, errors, success, skipped, time_spent = handle_test_results(artifact["stats"])
                matrix_job_results[matrix_name]["success"] += success
                matrix_job_results[matrix_name]["errors"] += errors
                matrix_job_results[matrix_name]["skipped"] += skipped
                matrix_job_results[matrix_name]["time_spent"].append(float(time_spent[:-1]))

                stacktraces = handle_stacktraces(artifact["failures_line"])

                # Add the captured actual outputs for patched methods (`torch.testing.assert_close`, `assertEqual` etc.)
                if "captured_info" in artifact:
                    step_number = None
                    for step in job.get("steps", []):
                        if step["name"] == "Captured information":
                            step_number = step["number"]
                            break
                    if step_number is not None:
                        step_link = f"{job['html_url']}#step:{step_number}:1"
                        matrix_job_results[matrix_name]["captured_info"][artifact_gpu] = step_link
                        matrix_job_results_extra[matrix_name]["captured_info"][artifact_gpu] = {
                            "link": step_link,
                            "captured_info": artifact["captured_info"],
                        }

                for line in artifact["summary_short"].split("\n"):
                    if line.startswith("FAILED "):
                        # Avoid the extra `FAILED` entry given by `run_test_using_subprocess` causing issue when calling
                        # `stacktraces.pop` below.
                        # See `run_test_using_subprocess` in `src/transformers/testing_utils.py`
                        if " - Failed: (subprocess)" in line:
                            continue
                        line = line[len("FAILED ") :]
                        line = line.split()[0].replace("\n", "")

                        if artifact_gpu not in matrix_job_results[matrix_name]["failures"]:
                            matrix_job_results[matrix_name]["failures"][artifact_gpu] = []

                        trace = pop_default(stacktraces, 0, "Cannot retrieve error message.")
                        matrix_job_results[matrix_name]["failures"][artifact_gpu].append(
                            {"line": line, "trace": trace}
                        )

                        # TODO: How to deal wit this

                        if re.search("tests/quantization", line):
                            matrix_job_results[matrix_name]["failed"]["Quantization"][artifact_gpu] += 1

                        elif re.search("test_modeling", line):
                            matrix_job_results[matrix_name]["failed"]["PyTorch"][artifact_gpu] += 1

                        elif re.search("test_tokenization", line):
                            matrix_job_results[matrix_name]["failed"]["Tokenizers"][artifact_gpu] += 1

                        elif re.search("test_pipelines", line):
                            matrix_job_results[matrix_name]["failed"]["Pipelines"][artifact_gpu] += 1

                        elif re.search("test_trainer", line):
                            matrix_job_results[matrix_name]["failed"]["Trainer"][artifact_gpu] += 1

                        elif re.search("onnx", line):
                            matrix_job_results[matrix_name]["failed"]["ONNX"][artifact_gpu] += 1

                        elif re.search("auto", line):
                            matrix_job_results[matrix_name]["failed"]["Auto"][artifact_gpu] += 1

                        else:
                            matrix_job_results[matrix_name]["failed"]["Unclassified"][artifact_gpu] += 1
                            unclassified_model_failures.append(line)

    # Additional runs
    additional_files = {
        "PyTorch pipelines": "run_pipelines_torch_gpu_test_reports",
        "Examples directory": "run_examples_gpu_test_reports",
        "DeepSpeed": "run_torch_cuda_extensions_gpu_test_reports",
        "Kernels": "run_kernels_gpu_test_reports",
    }

    if ci_event in ["push", "Nightly CI"] or ci_event.startswith("Past CI"):
        del additional_files["Examples directory"]
        del additional_files["PyTorch pipelines"]
    elif ci_event.startswith("Scheduled CI (AMD)"):
        del additional_files["DeepSpeed"]
    elif ci_event.startswith("Push CI (AMD)"):
        additional_files = {}

    report_repo_id = os.getenv("REPORT_REPO_ID")

    # if it is not a scheduled run, upload the reports to a subfolder under `report_repo_folder`
    report_repo_subfolder = ""
    if os.getenv("GITHUB_EVENT_NAME") != "schedule":
        report_repo_subfolder = f"{os.getenv('GITHUB_RUN_NUMBER')}-{os.getenv('GITHUB_RUN_ID')}"
        report_repo_subfolder = f"runs/{report_repo_subfolder}"

    workflow_run = get_last_daily_ci_run(
        token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_run_id=os.getenv("GITHUB_RUN_ID")
    )
    workflow_run_created_time = workflow_run["created_at"]
    workflow_id = workflow_run["workflow_id"]

    report_repo_folder = workflow_run_created_time.split("T")[0]

    if report_repo_subfolder:
        report_repo_folder = f"{report_repo_folder}/{report_repo_subfolder}"

    # Remove some entries in `additional_files` if they are not concerned.
    test_name = None
    if job_name in job_to_test_map:
        test_name = job_to_test_map[job_name]
    additional_files = {k: v for k, v in additional_files.items() if k == test_name}

    additional_results = {
        key: {
            "failed": {"unclassified": 0, "single": 0, "multi": 0},
            "errors": 0,
            "success": 0,
            "skipped": 0,
            "time_spent": [],
            "error": False,
            "failures": {},
            "job_link": {},
        }
        for key in additional_files
    }

    for key in additional_results:
        # If a whole suite of test fails, the artifact isn't available.
        if additional_files[key] not in available_artifacts:
            additional_results[key]["error"] = True
            continue

        for artifact_path_dict in available_artifacts[additional_files[key]].paths:
            path = artifact_path_dict["path"]
            artifact_gpu = artifact_path_dict["gpu"]

            # Link to the GitHub Action job
            job = artifact_name_to_job_map[path]
            additional_results[key]["job_link"][artifact_gpu] = job["html_url"]

            artifact = retrieve_artifact(path, artifact_gpu)
            stacktraces = handle_stacktraces(artifact["failures_line"])

            failed, errors, success, skipped, time_spent = handle_test_results(artifact["stats"])
            additional_results[key]["failed"][artifact_gpu or "unclassified"] += failed
            additional_results[key]["success"] += success
            additional_results[key]["errors"] += errors
            additional_results[key]["skipped"] += skipped
            additional_results[key]["time_spent"].append(float(time_spent[:-1]))

            if len(artifact["errors"]):
                additional_results[key]["error"] = True

            if failed:
                for line in artifact["summary_short"].split("\n"):
                    if line.startswith("FAILED "):
                        # Avoid the extra `FAILED` entry given by `run_test_using_subprocess` causing issue when calling
                        # `stacktraces.pop` below.
                        # See `run_test_using_subprocess` in `src/transformers/testing_utils.py`
                        if " - Failed: (subprocess)" in line:
                            continue
                        line = line[len("FAILED ") :]
                        line = line.split()[0].replace("\n", "")

                        if artifact_gpu not in additional_results[key]["failures"]:
                            additional_results[key]["failures"][artifact_gpu] = []

                        trace = pop_default(stacktraces, 0, "Cannot retrieve error message.")
                        additional_results[key]["failures"][artifact_gpu].append({"line": line, "trace": trace})

    # Let's only check the warning for the model testing job. Currently, the job `run_extract_warnings` is only run
    # when `inputs.job` (in the workflow file) is `run_models_gpu`. The reason is: otherwise we need to save several
    # artifacts with different names which complicates the logic for an insignificant part of the CI workflow reporting.
    selected_warnings = []
    if job_name == "run_models_gpu":
        if "warnings_in_ci" in available_artifacts:
            directory = available_artifacts["warnings_in_ci"].paths[0]["path"]
            with open(os.path.join(directory, "selected_warnings.json")) as fp:
                selected_warnings = json.load(fp)

    if not os.path.isdir(os.path.join(os.getcwd(), f"ci_results_{job_name}")):
        os.makedirs(os.path.join(os.getcwd(), f"ci_results_{job_name}"))

    nvidia_daily_ci_workflow = (
        "huggingface/transformers/.github/workflows/self-scheduled-caller.yml",
        "huggingface/transformers/.github/workflows/self-scheduled-flash-attn-caller.yml",
    )
    amd_daily_ci_workflows = (
        "huggingface/transformers/.github/workflows/self-scheduled-amd-mi325-caller.yml",
        "huggingface/transformers/.github/workflows/self-scheduled-amd-mi355-caller.yml",
    )
    is_nvidia_daily_ci_workflow = os.environ.get("GITHUB_WORKFLOW_REF").startswith(nvidia_daily_ci_workflow)
    is_amd_daily_ci_workflow = os.environ.get("GITHUB_WORKFLOW_REF").startswith(amd_daily_ci_workflows)

    is_scheduled_ci_run = os.environ.get("GITHUB_EVENT_NAME") == "schedule"
    # For AMD workflow runs: the different AMD CI callers (MI210/MI250/MI300, etc.) are triggered by `workflow_run`
    #  event of `.github/workflows/self-scheduled-amd-caller.yml`.
    if os.environ.get("GITHUB_EVENT_NAME") == "workflow_run":
        # Get the path to the file on the runner that contains the full event webhook payload.
        event_payload_path = os.environ.get("GITHUB_EVENT_PATH")
        # Load the event payload
        with open(event_payload_path) as fp:
            event_payload = json.load(fp)
            # The event that triggers the original `workflow_run`.
            if "workflow_run" in event_payload:
                is_scheduled_ci_run = event_payload["workflow_run"]["event"] == "schedule"

    test_name_and_result_pairs = []
    if len(matrix_job_results) > 0:
        test_name = job_to_test_map[job_name]
        test_name_and_result_pairs.append((test_name, matrix_job_results))

    for test_name, result in additional_results.items():
        test_name_and_result_pairs.append((test_name, result))

    for test_name, result in test_name_and_result_pairs:
        with open(f"ci_results_{job_name}/{test_to_result_name[test_name]}_results.json", "w", encoding="UTF-8") as fp:
            json.dump(result, fp, indent=4, ensure_ascii=False)

        api.upload_file(
            path_or_fileobj=f"ci_results_{job_name}/{test_to_result_name[test_name]}_results.json",
            path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{test_to_result_name[test_name]}_results.json",
            repo_id=report_repo_id,
            repo_type="dataset",
            token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
        )

    if len(matrix_job_results_extra) > 0:
        with open(
            f"ci_results_{job_name}/{test_to_result_name[test_name]}_results_extra.json", "w", encoding="UTF-8"
        ) as fp:
            json.dump(matrix_job_results_extra, fp, indent=4, ensure_ascii=False)

        api.upload_file(
            path_or_fileobj=f"ci_results_{job_name}/{test_to_result_name[test_name]}_results_extra.json",
            path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{test_to_result_name[test_name]}_results_extra.json",
            repo_id=report_repo_id,
            repo_type="dataset",
            token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
        )

    # Let's create a file contain job --> job link
    if len(matrix_job_results) > 0:
        target_results = matrix_job_results
    else:
        default_result = {
            "failed": {"unclassified": 0, "single": 0, "multi": 0},
            "success": 0,
            "time_spent": [],
            "error": False,
            "failures": {},
            "job_link": {},
        }

        key = job_to_test_map.get(job_name)
        target_results = additional_results.get(key, default_result) if key is not None else default_result

    # Make the format uniform between `model_results` and `additional_results[XXX]`
    if "failures" in target_results:
        target_results = {job_name: target_results}

    job_links = {}
    sorted_dict = sorted(target_results.items(), key=lambda t: t[0])
    for job, job_result in sorted_dict:
        if job.startswith("models_"):
            job = job[len("models_") :]
        elif job.startswith("quantization_"):
            job = job[len("quantization_") :]
        job_links[job] = job_result["job_link"]

    with open(f"ci_results_{job_name}/job_links.json", "w", encoding="UTF-8") as fp:
        json.dump(job_links, fp, indent=4, ensure_ascii=False)

    api.upload_file(
        path_or_fileobj=f"ci_results_{job_name}/job_links.json",
        path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/job_links.json",
        repo_id=report_repo_id,
        repo_type="dataset",
        token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
    )

    prev_workflow_run_id = None
    other_workflow_run_ids = []

    if is_scheduled_ci_run:
        prev_workflow_run_id = get_last_daily_ci_workflow_run_id(
            token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_id=workflow_id
        )
        # For a scheduled run that is not the Nvidia's scheduled daily CI, add Nvidia's scheduled daily CI run as a target to compare.
        if not is_nvidia_daily_ci_workflow:
            # The id of the workflow `.github/workflows/self-scheduled-caller.yml` (not of a workflow run of it).
            other_workflow_id = "90575235"
            # We need to get the Nvidia's scheduled daily CI run that match the current run (i.e. run with the same commit SHA)
            other_workflow_run_id = get_last_daily_ci_workflow_run_id(
                token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_id=other_workflow_id, commit_sha=ci_sha
            )
            other_workflow_run_ids.append(other_workflow_run_id)
    # triggered via `issue_comment` for CI on pull requests (e.g. using the comment `run-slow:`)
    elif os.environ.get("GITHUB_EVENT_NAME") in ["issue_comment"]:
        # TODO (ydshieh): Make this flexible once we implement `run-slow` for AMD CI and others.
        # The id of the workflow `.github/workflows/self-scheduled-caller.yml` (not of a workflow run of it).
        prev_workflow_id = "90575235"
        # TODO (ydshieh): It's better to make sure using the last completed scheduled workflow run with the commit being a parent
        #  of the PR's `merge_commit`.
        prev_workflow_run_id = get_last_daily_ci_workflow_run_id(
            token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_id=prev_workflow_id
        )
    else:
        prev_workflow_run_id = os.environ["PREV_WORKFLOW_RUN_ID"]
        other_workflow_run_id = os.environ["OTHER_WORKFLOW_RUN_ID"]
        other_workflow_run_ids.append(other_workflow_run_id)

    prev_ci_artifacts = (None, None)
    other_ci_artifacts = []

    output_dir = os.path.join(os.getcwd(), "previous_reports")
    os.makedirs(output_dir, exist_ok=True)

    for idx, target_workflow_run_id in enumerate([prev_workflow_run_id] + other_workflow_run_ids):
        if target_workflow_run_id is None or target_workflow_run_id == "":
            continue
        else:
            artifact_names = [f"ci_results_{job_name}"]
            ci_artifacts = get_last_daily_ci_reports(
                artifact_names=artifact_names,
                output_dir=output_dir,
                token=os.environ["ACCESS_REPO_INFO_TOKEN"],
                workflow_run_id=target_workflow_run_id,
            )
            if idx == 0:
                prev_ci_artifacts = (target_workflow_run_id, ci_artifacts)
            else:
                other_ci_artifacts.append((target_workflow_run_id, ci_artifacts))

    # Only for AMD at this moment.
    # TODO: put this into a method
    diff_file_url = None
    if is_amd_daily_ci_workflow:
        if not (prev_workflow_run_id is None or prev_workflow_run_id == ""):
            ci_artifacts = get_last_daily_ci_reports(
                artifact_names=None,
                output_dir=output_dir,
                token=os.environ["ACCESS_REPO_INFO_TOKEN"],
                workflow_run_id=prev_workflow_run_id,
            )

            current_artifacts = sorted([d for d in os.listdir() if os.path.isdir(d) and d.endswith("_test_reports")])
            prev_artifacts = sorted([d for d in os.listdir(output_dir) if os.path.isdir(os.path.join(output_dir, d)) and d.endswith("_test_reports")])  # fmt: skip

            current_artifacts_set = {}
            for d in current_artifacts:
                current_artifacts_set[d] = os.path.join(d, "summary_short.txt")

            prev_artifacts_set = {}
            for d in prev_artifacts:
                prev_artifacts_set[d] = os.path.join(output_dir, d, "summary_short.txt")

            report = compare_job_sets(prev_artifacts_set, current_artifacts_set)

            with open(f"ci_results_{job_name}/test_results_diff.json", "w") as fp:
                fp.write(report)

            # upload
            commit_info = api.upload_file(
                path_or_fileobj=f"ci_results_{job_name}/test_results_diff.json",
                path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/test_results_diff.json",
                repo_id=report_repo_id,
                repo_type="dataset",
                token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
            )
            diff_file_url = f"https://huggingface.co/datasets/{report_repo_id}/resolve/{commit_info.oid}/{report_repo_folder}/ci_results_{job_name}/test_results_diff.json"

    ci_name_in_report = ""
    if job_name in job_to_test_map:
        ci_name_in_report = job_to_test_map[job_name]

    title = f"[INFO] Results of {ci_event}: {ci_name_in_report}"

    message = Message(
        title,
        ci_title,
        matrix_job_results,
        additional_results,
        selected_warnings=selected_warnings,
        prev_ci_artifacts=prev_ci_artifacts,
        other_ci_artifacts=other_ci_artifacts,
    )

    # send report only if there is any failure (for push CI)
    if message.n_failures or (ci_event != "push" and not ci_event.startswith("Push CI (AMD)")):
        message.post()
        message.post_reply()
