# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import io
import json
import logging
import mimetypes
import os
from typing import Any, Optional, Union
from urllib.parse import urlencode

from . import _api_module
from . import _common
from . import _transformers as t
from . import types
from ._common import get_value_by_path as getv
from ._common import set_value_by_path as setv
from .pagers import AsyncPager, Pager


logger = logging.getLogger('google_genai.files')


def _ListFilesConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['page_size']) is not None:
    setv(
        parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size'])
    )

  if getv(from_object, ['page_token']) is not None:
    setv(
        parent_object,
        ['_query', 'pageToken'],
        getv(from_object, ['page_token']),
    )

  return to_object


def _ListFilesParameters_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['config']) is not None:
    setv(
        to_object,
        ['config'],
        _ListFilesConfig_to_mldev(getv(from_object, ['config']), to_object),
    )

  return to_object


def _FileStatus_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['details']) is not None:
    setv(to_object, ['details'], getv(from_object, ['details']))

  if getv(from_object, ['message']) is not None:
    setv(to_object, ['message'], getv(from_object, ['message']))

  if getv(from_object, ['code']) is not None:
    setv(to_object, ['code'], getv(from_object, ['code']))

  return to_object


def _File_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['display_name']) is not None:
    setv(to_object, ['displayName'], getv(from_object, ['display_name']))

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  if getv(from_object, ['size_bytes']) is not None:
    setv(to_object, ['sizeBytes'], getv(from_object, ['size_bytes']))

  if getv(from_object, ['create_time']) is not None:
    setv(to_object, ['createTime'], getv(from_object, ['create_time']))

  if getv(from_object, ['expiration_time']) is not None:
    setv(to_object, ['expirationTime'], getv(from_object, ['expiration_time']))

  if getv(from_object, ['update_time']) is not None:
    setv(to_object, ['updateTime'], getv(from_object, ['update_time']))

  if getv(from_object, ['sha256_hash']) is not None:
    setv(to_object, ['sha256Hash'], getv(from_object, ['sha256_hash']))

  if getv(from_object, ['uri']) is not None:
    setv(to_object, ['uri'], getv(from_object, ['uri']))

  if getv(from_object, ['download_uri']) is not None:
    setv(to_object, ['downloadUri'], getv(from_object, ['download_uri']))

  if getv(from_object, ['state']) is not None:
    setv(to_object, ['state'], getv(from_object, ['state']))

  if getv(from_object, ['source']) is not None:
    setv(to_object, ['source'], getv(from_object, ['source']))

  if getv(from_object, ['video_metadata']) is not None:
    setv(to_object, ['videoMetadata'], getv(from_object, ['video_metadata']))

  if getv(from_object, ['error']) is not None:
    setv(
        to_object,
        ['error'],
        _FileStatus_to_mldev(getv(from_object, ['error']), to_object),
    )

  return to_object


def _CreateFileParameters_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['file']) is not None:
    setv(
        to_object,
        ['file'],
        _File_to_mldev(getv(from_object, ['file']), to_object),
    )

  if getv(from_object, ['config']) is not None:
    setv(to_object, ['config'], getv(from_object, ['config']))

  return to_object


def _GetFileParameters_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object, ['_url', 'file'], t.t_file_name(getv(from_object, ['name']))
    )

  if getv(from_object, ['config']) is not None:
    setv(to_object, ['config'], getv(from_object, ['config']))

  return to_object


def _DeleteFileParameters_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object, ['_url', 'file'], t.t_file_name(getv(from_object, ['name']))
    )

  if getv(from_object, ['config']) is not None:
    setv(to_object, ['config'], getv(from_object, ['config']))

  return to_object


def _FileStatus_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['details']) is not None:
    setv(to_object, ['details'], getv(from_object, ['details']))

  if getv(from_object, ['message']) is not None:
    setv(to_object, ['message'], getv(from_object, ['message']))

  if getv(from_object, ['code']) is not None:
    setv(to_object, ['code'], getv(from_object, ['code']))

  return to_object


def _File_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['displayName']) is not None:
    setv(to_object, ['display_name'], getv(from_object, ['displayName']))

  if getv(from_object, ['mimeType']) is not None:
    setv(to_object, ['mime_type'], getv(from_object, ['mimeType']))

  if getv(from_object, ['sizeBytes']) is not None:
    setv(to_object, ['size_bytes'], getv(from_object, ['sizeBytes']))

  if getv(from_object, ['createTime']) is not None:
    setv(to_object, ['create_time'], getv(from_object, ['createTime']))

  if getv(from_object, ['expirationTime']) is not None:
    setv(to_object, ['expiration_time'], getv(from_object, ['expirationTime']))

  if getv(from_object, ['updateTime']) is not None:
    setv(to_object, ['update_time'], getv(from_object, ['updateTime']))

  if getv(from_object, ['sha256Hash']) is not None:
    setv(to_object, ['sha256_hash'], getv(from_object, ['sha256Hash']))

  if getv(from_object, ['uri']) is not None:
    setv(to_object, ['uri'], getv(from_object, ['uri']))

  if getv(from_object, ['downloadUri']) is not None:
    setv(to_object, ['download_uri'], getv(from_object, ['downloadUri']))

  if getv(from_object, ['state']) is not None:
    setv(to_object, ['state'], getv(from_object, ['state']))

  if getv(from_object, ['source']) is not None:
    setv(to_object, ['source'], getv(from_object, ['source']))

  if getv(from_object, ['videoMetadata']) is not None:
    setv(to_object, ['video_metadata'], getv(from_object, ['videoMetadata']))

  if getv(from_object, ['error']) is not None:
    setv(
        to_object,
        ['error'],
        _FileStatus_from_mldev(getv(from_object, ['error']), to_object),
    )

  return to_object


def _ListFilesResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['nextPageToken']) is not None:
    setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken']))

  if getv(from_object, ['files']) is not None:
    setv(
        to_object,
        ['files'],
        [
            _File_from_mldev(item, to_object)
            for item in getv(from_object, ['files'])
        ],
    )

  return to_object


def _CreateFileResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  return to_object


def _DeleteFileResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  return to_object


class Files(_api_module.BaseModule):

  def _list(
      self, *, config: Optional[types.ListFilesConfigOrDict] = None
  ) -> types.ListFilesResponse:
    """Lists all files from the service.

    Args:
      config (ListFilesConfig): Optional, configuration for the list method.

    Returns:
      ListFilesResponse: The response for the list method.

    Usage:

    .. code-block:: python

      pager = client.files.list(config={'page_size': 10})
      for file in pager.page:
        print(file.name)
    """

    parameter_model = types._ListFilesParameters(
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _ListFilesParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'files'.format_map(request_url_dict)
      else:
        path = 'files'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request('get', path, request_dict, http_options)

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _ListFilesResponse_from_mldev(response_dict)

    return_value = types.ListFilesResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def _create(
      self,
      *,
      file: types.FileOrDict,
      config: Optional[types.CreateFileConfigOrDict] = None,
  ) -> types.CreateFileResponse:
    parameter_model = types._CreateFileParameters(
        file=file,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _CreateFileParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'upload/v1beta/files'.format_map(request_url_dict)
      else:
        path = 'upload/v1beta/files'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    if config is not None and getattr(
        config, 'should_return_http_response', None
    ):
      return_value = types.CreateFileResponse(sdk_http_response=response)
      self._api_client._verify_response(return_value)
      return return_value

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _CreateFileResponse_from_mldev(response_dict)

    return_value = types.CreateFileResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  def get(
      self, *, name: str, config: Optional[types.GetFileConfigOrDict] = None
  ) -> types.File:
    """Retrieves the file information from the service.

    Args:
      name (str): The name identifier for the file to retrieve.
      config (GetFileConfig): Optional, configuration for the get method.

    Returns:
      File: The file information.

    Usage:

    .. code-block:: python

      file = client.files.get(name='files/...')
      print(file.uri)
    """

    parameter_model = types._GetFileParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _GetFileParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'files/{file}'.format_map(request_url_dict)
      else:
        path = 'files/{file}'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request('get', path, request_dict, http_options)

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _File_from_mldev(response_dict)

    return_value = types.File._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  def delete(
      self, *, name: str, config: Optional[types.DeleteFileConfigOrDict] = None
  ) -> types.DeleteFileResponse:
    """Deletes a remotely stored file.

    Args:
      name (str): The name identifier for the file to delete.
      config (DeleteFileConfig): Optional, configuration for the delete method.

    Returns:
      DeleteFileResponse: The response for the delete method

    Usage:

    .. code-block:: python

      client.files.delete(name='files/...')
    """

    parameter_model = types._DeleteFileParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _DeleteFileParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'files/{file}'.format_map(request_url_dict)
      else:
        path = 'files/{file}'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'delete', path, request_dict, http_options
    )

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _DeleteFileResponse_from_mldev(response_dict)

    return_value = types.DeleteFileResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def upload(
      self,
      *,
      file: Union[str, os.PathLike[str], io.IOBase],
      config: Optional[types.UploadFileConfigOrDict] = None,
  ) -> types.File:
    """Calls the API to upload a file using a supported file service.

    Args:
      file: A path to the file or an `IOBase` object to be uploaded. If it's an
        IOBase object, it must be opened in blocking (the default) mode and
        binary mode. In other words, do not use non-blocking mode or text mode.
        The given stream must be seekable, that is, it must be able to call
        `seek()` on 'path'.
      config: Optional parameters to set `diplay_name`, `mime_type`, and `name`.
    """
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    config_model = types.UploadFileConfig()
    if config:
      if isinstance(config, dict):
        config_model = types.UploadFileConfig(**config)
      else:
        config_model = config
      file_obj = types.File(
          mime_type=config_model.mime_type,
          name=config_model.name,
          display_name=config_model.display_name,
      )
    else:  # if not config
      file_obj = types.File()
    if file_obj.name is not None and not file_obj.name.startswith('files/'):
      file_obj.name = f'files/{file_obj.name}'

    if isinstance(file, io.IOBase):
      if file_obj.mime_type is None:
        raise ValueError(
            'Unknown mime type: Could not determine the mimetype for your'
            ' file\n please set the `mime_type` argument'
        )
      if hasattr(file, 'mode'):
        if 'b' not in file.mode:
          raise ValueError('The file must be opened in binary mode.')
      offset = file.tell()
      file.seek(0, os.SEEK_END)
      file_obj.size_bytes = file.tell() - offset
      file.seek(offset, os.SEEK_SET)
    else:
      fs_path = os.fspath(file)
      if not fs_path or not os.path.isfile(fs_path):
        raise FileNotFoundError(f'{file} is not a valid file path.')
      file_obj.size_bytes = os.path.getsize(fs_path)
      if file_obj.mime_type is None:
        file_obj.mime_type, _ = mimetypes.guess_type(fs_path)
      if file_obj.mime_type is None:
        raise ValueError(
            'Unknown mime type: Could not determine the mimetype for your'
            ' file\n    please set the `mime_type` argument'
        )

    http_options: types.HttpOptions
    if config_model and config_model.http_options:
      http_options = config_model.http_options
      http_options.api_version = ''
      http_options.headers = {
          'Content-Type': 'application/json',
          'X-Goog-Upload-Protocol': 'resumable',
          'X-Goog-Upload-Command': 'start',
          'X-Goog-Upload-Header-Content-Length': f'{file_obj.size_bytes}',
          'X-Goog-Upload-Header-Content-Type': f'{file_obj.mime_type}',
      }
    else:
      http_options = types.HttpOptions(
          api_version='',
          headers={
              'Content-Type': 'application/json',
              'X-Goog-Upload-Protocol': 'resumable',
              'X-Goog-Upload-Command': 'start',
              'X-Goog-Upload-Header-Content-Length': f'{file_obj.size_bytes}',
              'X-Goog-Upload-Header-Content-Type': f'{file_obj.mime_type}',
          },
      )
    response = self._create(
        file=file_obj,
        config=types.CreateFileConfig(
            http_options=http_options, should_return_http_response=True
        ),
    )

    if (
        response.sdk_http_response is None
        or response.sdk_http_response.headers is None
        or 'x-goog-upload-url' not in response.sdk_http_response.headers
    ):
      raise KeyError(
          'Failed to create file. Upload URL did not returned from the create'
          ' file request.'
      )
    upload_url = response.sdk_http_response.headers['x-goog-upload-url']

    if isinstance(file, io.IOBase):
      return_file = self._api_client.upload_file(
          file, upload_url, file_obj.size_bytes, http_options=http_options
      )
    else:
      return_file = self._api_client.upload_file(
          fs_path, upload_url, file_obj.size_bytes, http_options=http_options
      )

    return types.File._from_response(
        response=_File_from_mldev(return_file.json['file']),
        kwargs=config_model.model_dump() if config else {},
    )

  def list(
      self, *, config: Optional[types.ListFilesConfigOrDict] = None
  ) -> Pager[types.File]:
    return Pager(
        'files',
        self._list,
        self._list(config=config),
        config,
    )

  def download(
      self,
      *,
      file: Union[str, types.File, types.Video, types.GeneratedVideo],
      config: Optional[types.DownloadFileConfigOrDict] = None,
  ) -> bytes:
    """Downloads a file's data from storage.

    Files created by `upload` can't be downloaded. You can tell which files are
    downloadable by checking the `source` or `download_uri` property.

    Note: This method returns the data as bytes. For `Video` and
    `GeneratedVideo` objects there is an additional side effect, that it also
    sets the `video_bytes` property on the `Video` object.

    Args:
      file (str): A file name, uri, or file object. Identifying which file to
        download.
      config (DownloadFileConfigOrDict): Optional, configuration for the get
        method.

    Returns:
      File: The file data as bytes.

    Usage:

    .. code-block:: python

      for file client.files.list():
        if file.download_uri is not None:
          break
      else:
        raise ValueError('No files found with a `download_uri`.')
      data = client.files.download(file=file)
      # data = client.files.download(file=file.name)
      # data = client.files.download(file=file.download_uri)

      video = types.Video(uri=file.uri)
      video_bytes = client.files.download(file=video)
      video.video_bytes
    """
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )

    config_model = None
    if config:
      if isinstance(config, dict):
        config_model = types.DownloadFileConfig(**config)
      else:
        config_model = config

    if isinstance(file, types.File) and file.download_uri is None:
      raise ValueError(
          "Only generated files can be downloaded, uploaded files can't be "
          'downloaded. You can tell which files are downloadable by checking '
          'the `source` or `download_uri` property.'
      )
    name = t.t_file_name(file)

    path = f'files/{name}:download'

    query_params = {'alt': 'media'}
    path = f'{path}?{urlencode(query_params)}'
    http_options = None
    if getv(config_model, ['http_options']) is not None:
      http_options = getv(config_model, ['http_options'])

    data = self._api_client.download_file(
        path,
        http_options=http_options,
    )

    if isinstance(file, types.Video):
      file.video_bytes = data
    elif isinstance(file, types.GeneratedVideo) and file.video is not None:
      file.video.video_bytes = data

    return data


class AsyncFiles(_api_module.BaseModule):

  async def _list(
      self, *, config: Optional[types.ListFilesConfigOrDict] = None
  ) -> types.ListFilesResponse:
    """Lists all files from the service.

    Args:
      config (ListFilesConfig): Optional, configuration for the list method.

    Returns:
      ListFilesResponse: The response for the list method.

    Usage:

    .. code-block:: python

      pager = await client.aio.files.list(config={'page_size': 10})
      for file in pager.page:
        print(file.name)
    """

    parameter_model = types._ListFilesParameters(
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _ListFilesParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'files'.format_map(request_url_dict)
      else:
        path = 'files'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'get', path, request_dict, http_options
    )

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _ListFilesResponse_from_mldev(response_dict)

    return_value = types.ListFilesResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def _create(
      self,
      *,
      file: types.FileOrDict,
      config: Optional[types.CreateFileConfigOrDict] = None,
  ) -> types.CreateFileResponse:
    parameter_model = types._CreateFileParameters(
        file=file,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _CreateFileParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'upload/v1beta/files'.format_map(request_url_dict)
      else:
        path = 'upload/v1beta/files'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    if config is not None and getattr(
        config, 'should_return_http_response', None
    ):
      return_value = types.CreateFileResponse(sdk_http_response=response)
      self._api_client._verify_response(return_value)
      return return_value

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _CreateFileResponse_from_mldev(response_dict)

    return_value = types.CreateFileResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def get(
      self, *, name: str, config: Optional[types.GetFileConfigOrDict] = None
  ) -> types.File:
    """Retrieves the file information from the service.

    Args:
      name (str): The name identifier for the file to retrieve.
      config (GetFileConfig): Optional, configuration for the get method.

    Returns:
      File: The file information.

    Usage:

    .. code-block:: python

      file = await client.aio.files.get(name='files/...')
      print(file.uri)
    """

    parameter_model = types._GetFileParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _GetFileParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'files/{file}'.format_map(request_url_dict)
      else:
        path = 'files/{file}'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'get', path, request_dict, http_options
    )

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _File_from_mldev(response_dict)

    return_value = types.File._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def delete(
      self, *, name: str, config: Optional[types.DeleteFileConfigOrDict] = None
  ) -> types.DeleteFileResponse:
    """Deletes a remotely stored file.

    Args:
      name (str): The name identifier for the file to delete.
      config (DeleteFileConfig): Optional, configuration for the delete method.

    Returns:
      DeleteFileResponse: The response for the delete method

    Usage:

    .. code-block:: python

      await client.aio.files.delete(name='files/...')
    """

    parameter_model = types._DeleteFileParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _DeleteFileParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'files/{file}'.format_map(request_url_dict)
      else:
        path = 'files/{file}'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'delete', path, request_dict, http_options
    )

    response_dict = '' if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _DeleteFileResponse_from_mldev(response_dict)

    return_value = types.DeleteFileResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def upload(
      self,
      *,
      file: Union[str, os.PathLike[str], io.IOBase],
      config: Optional[types.UploadFileConfigOrDict] = None,
  ) -> types.File:
    """Calls the API to upload a file asynchronously using a supported file service.

    Args:
      file: A path to the file or an `IOBase` object to be uploaded. If it's an
        IOBase object, it must be opened in blocking (the default) mode and
        binary mode. In other words, do not use non-blocking mode or text mode.
        The given stream must be seekable, that is, it must be able to call
        `seek()` on 'path'.
      config: Optional parameters to set `diplay_name`, `mime_type`, and `name`.
    """
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    config_model = types.UploadFileConfig()
    if config:
      if isinstance(config, dict):
        config_model = types.UploadFileConfig(**config)
      else:
        config_model = config
      file_obj = types.File(
          mime_type=config_model.mime_type,
          name=config_model.name,
          display_name=config_model.display_name,
      )
    else:  # if not config
      file_obj = types.File()
    if file_obj.name is not None and not file_obj.name.startswith('files/'):
      file_obj.name = f'files/{file_obj.name}'

    if isinstance(file, io.IOBase):
      if file_obj.mime_type is None:
        raise ValueError(
            'Unknown mime type: Could not determine the mimetype for your'
            ' file\n    please set the `mime_type` argument'
        )
      if hasattr(file, 'mode'):
        if 'b' not in file.mode:
          raise ValueError('The file must be opened in binary mode.')
      offset = file.tell()
      file.seek(0, os.SEEK_END)
      file_obj.size_bytes = file.tell() - offset
      file.seek(offset, os.SEEK_SET)
    else:
      fs_path = os.fspath(file)
      if not fs_path or not os.path.isfile(fs_path):
        raise FileNotFoundError(f'{file} is not a valid file path.')
      file_obj.size_bytes = os.path.getsize(fs_path)
      if file_obj.mime_type is None:
        file_obj.mime_type, _ = mimetypes.guess_type(fs_path)
      if file_obj.mime_type is None:
        raise ValueError(
            'Unknown mime type: Could not determine the mimetype for your'
            ' file\n    please set the `mime_type` argument'
        )

    http_options: types.HttpOptions
    if config_model and config_model.http_options:
      http_options = config_model.http_options
      http_options.api_version = ''
      http_options.headers = {
          'Content-Type': 'application/json',
          'X-Goog-Upload-Protocol': 'resumable',
          'X-Goog-Upload-Command': 'start',
          'X-Goog-Upload-Header-Content-Length': f'{file_obj.size_bytes}',
          'X-Goog-Upload-Header-Content-Type': f'{file_obj.mime_type}',
      }
    else:
      http_options = types.HttpOptions(
          api_version='',
          headers={
              'Content-Type': 'application/json',
              'X-Goog-Upload-Protocol': 'resumable',
              'X-Goog-Upload-Command': 'start',
              'X-Goog-Upload-Header-Content-Length': f'{file_obj.size_bytes}',
              'X-Goog-Upload-Header-Content-Type': f'{file_obj.mime_type}',
          },
      )
    response = await self._create(
        file=file_obj,
        config=types.CreateFileConfig(
            http_options=http_options, should_return_http_response=True
        ),
    )
    if (
        response.sdk_http_response is None
        or response.sdk_http_response.headers is None
        or (
            'x-goog-upload-url' not in response.sdk_http_response.headers
            and 'X-Goog-Upload-URL' not in response.sdk_http_response.headers
        )
    ):
      raise KeyError(
          'Failed to create file. Upload URL did not returned from the create'
          ' file request.'
      )
    elif 'x-goog-upload-url' in response.sdk_http_response.headers:
      upload_url = response.sdk_http_response.headers['x-goog-upload-url']
    else:
      upload_url = response.sdk_http_response.headers['X-Goog-Upload-URL']

    if isinstance(file, io.IOBase):
      return_file = await self._api_client.async_upload_file(
          file, upload_url, file_obj.size_bytes, http_options=http_options
      )
    else:
      return_file = await self._api_client.async_upload_file(
          fs_path, upload_url, file_obj.size_bytes, http_options=http_options
      )

    return types.File._from_response(
        response=_File_from_mldev(return_file.json['file']),
        kwargs=config_model.model_dump() if config else {},
    )

  async def list(
      self, *, config: Optional[types.ListFilesConfigOrDict] = None
  ) -> AsyncPager[types.File]:
    return AsyncPager(
        'files',
        self._list,
        await self._list(config=config),
        config,
    )

  async def download(
      self,
      *,
      file: Union[str, types.File],
      config: Optional[types.DownloadFileConfigOrDict] = None,
  ) -> bytes:
    """Downloads a file's data from the file service.

    The Vertex-AI implementation of the API foes not include the file service.

    Files created by `upload` can't be downloaded. You can tell which files are
    downloadable by checking the `download_uri` property.

    Args:
      File (str): A file name, uri, or file object. Identifying which file to
        download.
      config (DownloadFileConfigOrDict): Optional, configuration for the get
        method.

    Returns:
      File: The file data as bytes.

    Usage:

    .. code-block:: python

      for file client.files.list():
        if file.download_uri is not None:
          break
      else:
        raise ValueError('No files found with a `download_uri`.')
      data = client.files.download(file=file)
      # data = client.files.download(file=file.name)
      # data = client.files.download(file=file.uri)
    """
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )

    config_model = None
    if config:
      if isinstance(config, dict):
        config_model = types.DownloadFileConfig(**config)
      else:
        config_model = config

    name = t.t_file_name(file)

    path = f'files/{name}:download'

    http_options = None
    if getv(config_model, ['http_options']) is not None:
      http_options = getv(config_model, ['http_options'])

    query_params = {'alt': 'media'}
    if query_params:
      path = f'{path}?{urlencode(query_params)}'

    data = await self._api_client.async_download_file(
        path,
        http_options=http_options,
    )

    return data
