# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import tempfile
import unittest

from transformers import AutoModelForCausalLM, AutoTokenizer, FPQuantConfig
from transformers.testing_utils import (
    backend_empty_cache,
    require_accelerate,
    require_fp_quant,
    require_qutlass,
    require_torch_accelerator,
    require_torch_multi_accelerator,
    slow,
    torch_device,
)


@require_torch_accelerator
class FPQuantConfigTest(unittest.TestCase):
    def test_to_dict(self):
        """
        Simple test that checks if one uses a config and converts it to a dict, the dict is the same as the config object
        """
        quantization_config = FPQuantConfig()
        config_to_dict = quantization_config.to_dict()

        for key in config_to_dict:
            self.assertEqual(getattr(quantization_config, key), config_to_dict[key])

    def test_from_dict(self):
        """
        Simple test that checks if one uses a dict and converts it to a config object, the config object is the same as the dict
        """
        dict = {"modules_to_not_convert": ["embed_tokens", "lm_head"], "quant_method": "fp_quant"}
        quantization_config = FPQuantConfig.from_dict(dict)

        self.assertEqual(dict["modules_to_not_convert"], quantization_config.modules_to_not_convert)
        self.assertEqual(dict["quant_method"], quantization_config.quant_method)


@slow
@require_torch_accelerator
@require_fp_quant
@require_accelerate
class FPQuantBaseTest(unittest.TestCase):
    model_name = "unsloth/Llama-3.2-1B"

    input_text = "1 2 3 4"
    max_new_tokens = 4

    EXPECTED_OUTPUT = "1 2 3 4 5 6"

    device_map = torch_device

    @classmethod
    def getQuantizationConfig(cls):
        unittest.skip("Subclass must implement this method")

    # called only once for all test in this class
    @classmethod
    def setUpClass(cls):
        """
        Setup quantized model
        """

        cls.quantization_config = cls.getQuantizationConfig()
        cls.tokenizer = AutoTokenizer.from_pretrained(cls.model_name)
        cls.quantized_model = AutoModelForCausalLM.from_pretrained(
            cls.model_name, device_map=cls.device_map, quantization_config=cls.quantization_config
        )

    def tearDown(self):
        gc.collect()
        backend_empty_cache(torch_device)
        gc.collect()

    def test_quantized_model(self):
        """
        Simple test that checks if the quantized model is working properly
        """
        input_ids = self.tokenizer(self.input_text, return_tensors="pt").to(torch_device)

        output = self.quantized_model.generate(**input_ids, max_new_tokens=self.max_new_tokens)
        self.assertEqual(self.tokenizer.decode(output[0], skip_special_tokens=True), self.EXPECTED_OUTPUT)

    def test_save_pretrained(self):
        """
        Simple test that checks if the quantized model is working properly after being saved and loaded
        """
        with tempfile.TemporaryDirectory() as tmpdirname:
            self.quantized_model.save_pretrained(tmpdirname)

            model = AutoModelForCausalLM.from_pretrained(tmpdirname, device_map=self.device_map)

            input_ids = self.tokenizer(self.input_text, return_tensors="pt").to(torch_device)

            output = model.generate(**input_ids, max_new_tokens=self.max_new_tokens)
            self.assertEqual(self.tokenizer.decode(output[0], skip_special_tokens=True), self.EXPECTED_OUTPUT)

    @require_torch_multi_accelerator
    def test_quantized_model_multi_accelerator(self):
        """
        Simple test that checks if the quantized model is working properly with multiple accelerators.
        Set CUDA_VISIBLE_DEVICES=0,1 if you have more than 2 CUDA GPUs. Or set ZE_AFFINITY_MASK=0,1
        if you have more than 2 Intel XPUs.
        """
        input_ids = self.tokenizer(self.input_text, return_tensors="pt").to(torch_device)

        quantized_model = AutoModelForCausalLM.from_pretrained(
            self.model_name, device_map="auto", quantization_config=self.quantization_config
        )
        self.assertTrue(set(quantized_model.hf_device_map.values()) == {0, 1})

        output = quantized_model.generate(**input_ids, max_new_tokens=self.max_new_tokens)
        self.assertEqual(self.tokenizer.decode(output[0], skip_special_tokens=True), self.EXPECTED_OUTPUT)

    @require_torch_multi_accelerator
    def test_save_pretrained_multi_accelerator(self):
        """
        Simple test that checks if the quantized model is working properly after being saved and loaded
        """
        with tempfile.TemporaryDirectory() as tmpdirname:
            self.quantized_model.save_pretrained(tmpdirname)

            model = AutoModelForCausalLM.from_pretrained(tmpdirname, device_map="auto")
            self.assertTrue(set(model.hf_device_map.values()) == {0, 1})

            input_ids = self.tokenizer(self.input_text, return_tensors="pt").to(torch_device)

            output = model.generate(**input_ids, max_new_tokens=self.max_new_tokens)
            self.assertEqual(self.tokenizer.decode(output[0], skip_special_tokens=True), self.EXPECTED_OUTPUT)


class FPQuantMXFP4PseudoquantTest(FPQuantBaseTest):
    @classmethod
    def getQuantizationConfig(cls):
        return FPQuantConfig(forward_dtype="mxfp4", pseudoquantization=True)


class FPQuantNVFP4PseudoquantTest(FPQuantBaseTest):
    @classmethod
    def getQuantizationConfig(cls):
        return FPQuantConfig(forward_dtype="nvfp4", pseudoquantization=True)


@require_qutlass
class FPQuantMXFP4Test(FPQuantBaseTest):
    @classmethod
    def getQuantizationConfig(cls):
        return FPQuantConfig(forward_dtype="mxfp4", pseudoquantization=False)


@require_qutlass
class FPQuantNVFP4Test(FPQuantBaseTest):
    @classmethod
    def getQuantizationConfig(cls):
        return FPQuantConfig(forward_dtype="nvfp4", pseudoquantization=False)


@require_qutlass
class FPQuantMXFP4GS128Test(FPQuantBaseTest):
    @classmethod
    def getQuantizationConfig(cls):
        return FPQuantConfig(forward_dtype="mxfp4", pseudoquantization=False, hadamard_group_size=128)


@require_qutlass
class FPQuantNVFP4GS128Test(FPQuantBaseTest):
    @classmethod
    def getQuantizationConfig(cls):
        return FPQuantConfig(forward_dtype="nvfp4", pseudoquantization=False, hadamard_group_size=128)
