# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch Xcodec model."""

import inspect
import json
import math
import unittest
from pathlib import Path

import numpy as np
from datasets import Audio, load_dataset
from parameterized import parameterized

from tests.test_configuration_common import ConfigTester
from tests.test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor
from transformers import AutoFeatureExtractor, XcodecConfig
from transformers.testing_utils import (
    is_torch_available,
    require_deterministic_for_xpu,
    require_torch,
    slow,
    torch_device,
)


if is_torch_available():
    import torch

    from transformers import DacConfig, HubertConfig, XcodecModel


@require_torch
class XcodecModelTester:
    def __init__(
        self,
        parent,
        batch_size=4,
        num_channels=1,
        sample_rate=16000,
        codebook_size=1024,
        num_samples=256,
        is_training=False,
    ):
        self.parent = parent
        self.batch_size = batch_size
        self.num_channels = num_channels
        self.sample_rate = sample_rate
        self.codebook_size = codebook_size
        self.is_training = is_training
        self.num_samples = num_samples
        self.acoustic_model_config = DacConfig(
            decoder_hidden_size=8, encoder_hidden_size=8, codebook_size=16, downsampling_ratios=[16, 16]
        )
        self.semantic_model_config = HubertConfig(
            hidden_size=32,
            num_hidden_layers=2,
            num_attention_heads=2,
            intermediate_size=12,
            conv_dim=(4, 4, 4, 4, 4, 4, 4),
        )

    def prepare_config_and_inputs(self):
        config = self.get_config()
        inputs_dict = {
            "input_values": floats_tensor([self.batch_size, self.num_channels, self.num_samples], scale=1.0)
        }
        return config, inputs_dict

    def prepare_config_and_inputs_for_common(self):
        config, inputs_dict = self.prepare_config_and_inputs()
        return config, inputs_dict

    def prepare_config_and_inputs_for_model_class(self, model_class):
        config, inputs_dict = self.prepare_config_and_inputs()
        codes_length = math.ceil(self.num_samples / config.hop_length)
        inputs_dict["audio_codes"] = ids_tensor(
            [self.batch_size, config.num_quantizers, codes_length], config.codebook_size
        )
        return config, inputs_dict

    def get_config(self):
        return XcodecConfig(
            sample_rate=self.sample_rate,
            audio_channels=self.num_channels,
            codebook_size=self.codebook_size,
            acoustic_model_config=self.acoustic_model_config,
            semantic_model_config=self.semantic_model_config,
        )

    def create_and_check_model_forward(self, config, inputs_dict):
        model = XcodecModel(config=config).to(torch_device).eval()
        result = model(input_values=inputs_dict["input_values"])
        self.parent.assertEqual(result.audio_values.shape, (self.batch_size, self.num_channels, self.num_samples))


@require_torch
class XcodecModelTest(ModelTesterMixin, unittest.TestCase):
    all_model_classes = (XcodecModel,) if is_torch_available() else ()
    is_encoder_decoder = True

    test_resize_embeddings = False

    def _prepare_for_class(self, inputs_dict, model_class, return_labels=False):
        # model does not support returning hidden states
        inputs_dict = super()._prepare_for_class(inputs_dict, model_class, return_labels=return_labels)
        if "output_attentions" in inputs_dict:
            inputs_dict.pop("output_attentions")
        if "output_hidden_states" in inputs_dict:
            inputs_dict.pop("output_hidden_states")
        return inputs_dict

    def setUp(self):
        self.model_tester = XcodecModelTester(self)
        self.config_tester = ConfigTester(
            self, config_class=XcodecConfig, common_properties=[], has_text_modality=False
        )

    def test_config(self):
        self.config_tester.run_common_tests()

    def test_model_forward(self):
        config_and_inputs = self.model_tester.prepare_config_and_inputs()
        self.model_tester.create_and_check_model_forward(*config_and_inputs)

    def test_forward_signature(self):
        config, _ = self.model_tester.prepare_config_and_inputs_for_common()

        for model_class in self.all_model_classes:
            model = model_class(config)
            signature = inspect.signature(model.forward)
            # signature.parameters is an OrderedDict => so arg_names order is deterministic
            arg_names = [*signature.parameters.keys()]

            expected_arg_names = ["input_values", "audio_codes", "bandwidth", "return_dict"]
            self.assertListEqual(arg_names[: len(expected_arg_names)], expected_arg_names)

    def test_gradient_checkpointing_backward_compatibility(self):
        config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()

        for model_class in self.all_model_classes:
            if not model_class.supports_gradient_checkpointing:
                continue

            config.text_encoder.gradient_checkpointing = True
            config.audio_encoder.gradient_checkpointing = True
            config.decoder.gradient_checkpointing = True
            model = model_class(config)
            self.assertTrue(model.is_gradient_checkpointing)

    @unittest.skip(reason="The XcodecModel does not have `inputs_embeds` logics")
    def test_inputs_embeds(self):
        pass

    @unittest.skip(reason="The XcodecModel does not have `inputs_embeds` logics")
    def test_model_get_set_embeddings(self):
        pass

    @unittest.skip(reason="The XcodecModel does not have the usual `attention` logic")
    def test_retain_grad_hidden_states_attentions(self):
        pass

    @unittest.skip(reason="The XcodecModel does not have the usual `attention` logic")
    def test_attention_outputs(self):
        pass

    @unittest.skip(reason="The XcodecModel does not have the usual `hidden_states` logic")
    def test_hidden_states_output(self):
        pass

    # Copied from transformers.tests.encodec.test_modeling_encodecEncodecModelTest.test_determinism
    def test_determinism(self):
        config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()

        def check_determinism(first, second):
            # outputs are not tensors but list (since each sequence don't have the same frame_length)
            out_1 = first.cpu().numpy()
            out_2 = second.cpu().numpy()
            out_1 = out_1[~np.isnan(out_1)]
            out_2 = out_2[~np.isnan(out_2)]
            max_diff = np.amax(np.abs(out_1 - out_2))
            self.assertLessEqual(max_diff, 1e-5)

        for model_class in self.all_model_classes:
            model = model_class(config)
            model.to(torch_device)
            model.eval()
            with torch.no_grad():
                first = model(**self._prepare_for_class(inputs_dict, model_class))[0]
                second = model(**self._prepare_for_class(inputs_dict, model_class))[0]

            if isinstance(first, tuple) and isinstance(second, tuple):
                for tensor1, tensor2 in zip(first, second):
                    check_determinism(tensor1, tensor2)
            else:
                check_determinism(first, second)

    # Copied from transformers.tests.encodec.test_modeling_encodecEncodecModelTest.test_model_outputs_equivalence
    def test_model_outputs_equivalence(self):
        config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()

        def set_nan_tensor_to_zero(t):
            t[t != t] = 0
            return t

        def check_equivalence(model, tuple_inputs, dict_inputs, additional_kwargs={}):
            with torch.no_grad():
                tuple_output = model(**tuple_inputs, return_dict=False, **additional_kwargs)
                dict_output = model(**dict_inputs, return_dict=True, **additional_kwargs)

                self.assertTrue(isinstance(tuple_output, tuple))
                self.assertTrue(isinstance(dict_output, dict))

                for tuple_value, dict_value in zip(tuple_output, dict_output.values()):
                    self.assertTrue(
                        torch.allclose(
                            set_nan_tensor_to_zero(tuple_value), set_nan_tensor_to_zero(dict_value), atol=1e-5
                        ),
                        msg=(
                            "Tuple and dict output are not equal. Difference:"
                            f" {torch.max(torch.abs(tuple_value - dict_value))}. Tuple has `nan`:"
                            f" {torch.isnan(tuple_value).any()} and `inf`: {torch.isinf(tuple_value)}. Dict has"
                            f" `nan`: {torch.isnan(dict_value).any()} and `inf`: {torch.isinf(dict_value)}."
                        ),
                    )

        for model_class in self.all_model_classes:
            model = model_class(config)
            model.to(torch_device)
            model.eval()

            tuple_inputs = self._prepare_for_class(inputs_dict, model_class)
            dict_inputs = self._prepare_for_class(inputs_dict, model_class)
            check_equivalence(model, tuple_inputs, dict_inputs)

    @unittest.skip(reason="The XcodecModel does not have support dynamic compile yet")
    def test_sdpa_can_compile_dynamic(self):
        pass


# Copied from transformers.tests.encodec.test_modeling_encodec.normalize
def normalize(arr):
    norm = np.linalg.norm(arr)
    normalized_arr = arr / norm
    return normalized_arr


# Copied from transformers.tests.encodec.test_modeling_encodec.compute_rmse
def compute_rmse(arr1, arr2):
    arr1_np = arr1.cpu().numpy().squeeze()
    arr2_np = arr2.cpu().numpy().squeeze()
    max_length = min(arr1.shape[-1], arr2.shape[-1])
    arr1_np = arr1_np[..., :max_length]
    arr2_np = arr2_np[..., :max_length]
    arr1_normalized = normalize(arr1_np)
    arr2_normalized = normalize(arr2_np)
    return np.sqrt(((arr1_normalized - arr2_normalized) ** 2).mean())


"""
Integration tests for XCodec

Code for reproducing expected outputs can be found here:
https://gist.github.com/ebezzam/cdaf8c223e59e7677b2ea6bc2dc8230b

One reason for higher tolerances is because of different implementation of `Snake1d` within Transformer version DAC
See here: https://github.com/huggingface/transformers/pull/39793#issue-3277407384

"""

RESULTS_PATH = Path(__file__).parent.parent.parent / "fixtures/xcodec/integration_tests.json"

with open(RESULTS_PATH, "r") as f:
    raw_data = json.load(f)

# convert dicts into tuples ordered to match test args
EXPECTED_OUTPUTS_JSON = [
    (
        f"{d['repo_id']}_{d['bandwidth']}",
        d["repo_id"],
        d["bandwidth"],
        d["codes"],
        d["decoded"],
        d["codec_error"],
        d["codec_tol"],
        d["dec_tol"],
    )
    for d in raw_data
]


@slow
@require_torch
class XcodecIntegrationTest(unittest.TestCase):
    @parameterized.expand(EXPECTED_OUTPUTS_JSON)
    @require_deterministic_for_xpu
    def test_integration(
        self, test_name, repo_id, bandwidth, exp_codes, exp_decoded, exp_codec_err, codec_tol, dec_tol
    ):
        # load model
        model = XcodecModel.from_pretrained(repo_id).to(torch_device).eval()
        feature_extractor = AutoFeatureExtractor.from_pretrained(repo_id)

        # load audio example
        librispeech_dummy = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
        librispeech_dummy = librispeech_dummy.cast_column(
            "audio", Audio(sampling_rate=feature_extractor.sampling_rate)
        )
        audio_array = librispeech_dummy[0]["audio"]["array"]
        inputs = feature_extractor(
            raw_audio=audio_array, sampling_rate=feature_extractor.sampling_rate, return_tensors="pt"
        ).to(torch_device)
        x = inputs["input_values"]

        with torch.no_grad():
            ENC_TOL = 0
            audio_codes = model.encode(x, bandwidth=bandwidth, return_dict=False)
            if exp_codes is not None:
                exp_codes = torch.tensor(exp_codes).to(torch_device)
                torch.testing.assert_close(
                    audio_codes[..., : exp_codes.shape[-1]],
                    exp_codes,
                    rtol=ENC_TOL,
                    atol=ENC_TOL,
                )

            # dec_tol = 1e-5    # increased to 1e-4 for passing on 4 kbps
            input_values_dec = model.decode(audio_codes).audio_values
            if exp_decoded is not None:
                exp_decoded = torch.tensor(exp_decoded).to(torch_device)
                torch.testing.assert_close(
                    input_values_dec[..., : exp_decoded.shape[-1]],
                    exp_decoded,
                    rtol=dec_tol,
                    atol=dec_tol,
                )

            # compute codec error
            codec_err = compute_rmse(input_values_dec, x)
            torch.testing.assert_close(codec_err, exp_codec_err, rtol=codec_tol, atol=codec_tol)

            # make sure forward and decode gives same result
            audio_values_enc_dec = model(x, bandwidth=bandwidth).audio_values
            torch.testing.assert_close(input_values_dec, audio_values_enc_dec, rtol=1e-6, atol=1e-6)
