Skip to content

openaivec.embeddings

AsyncBatchEmbeddings dataclass

Thin wrapper around the OpenAI /embeddings endpoint using async operations.

This class provides an asynchronous interface for generating embeddings using OpenAI models. It manages concurrency, handles rate limits automatically, and efficiently processes batches of inputs, including de-duplication.

Example
import asyncio
import numpy as np
from openai import AsyncOpenAI
from openaivec.aio.embeddings import AsyncBatchEmbeddings

# Assuming openai_async_client is an initialized AsyncOpenAI client
openai_async_client = AsyncOpenAI() # Replace with your actual client initialization

embedder = AsyncBatchEmbeddings(
    client=openai_async_client,
    model_name="text-embedding-3-small",
    max_concurrency=8  # Limit concurrent requests
)
texts = ["This is the first document.", "This is the second document.", "This is the first document."]

# Asynchronous call
async def main():
    embeddings = await embedder.create(texts, batch_size=128)
    # embeddings will be a list of numpy arrays (float32)
    # The embedding for the third text will be identical to the first
    # due to automatic de-duplication.
    print(f"Generated {len(embeddings)} embeddings.")
    print(f"Shape of first embedding: {embeddings[0].shape}")
    assert np.array_equal(embeddings[0], embeddings[2])

# Run the async function
asyncio.run(main())

Attributes:

Name Type Description
client AsyncOpenAI

An already‑configured openai.AsyncOpenAI client.

model_name str

The model identifier, e.g. "text-embedding-3-small".

max_concurrency int

Maximum number of concurrent requests to the OpenAI API.

Source code in src/openaivec/embeddings.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@dataclass(frozen=True)
class AsyncBatchEmbeddings:
    """Thin wrapper around the OpenAI /embeddings endpoint using async operations.

    This class provides an asynchronous interface for generating embeddings using
    OpenAI models. It manages concurrency, handles rate limits automatically,
    and efficiently processes batches of inputs, including de-duplication.

    Example:
        ```python
        import asyncio
        import numpy as np
        from openai import AsyncOpenAI
        from openaivec.aio.embeddings import AsyncBatchEmbeddings

        # Assuming openai_async_client is an initialized AsyncOpenAI client
        openai_async_client = AsyncOpenAI() # Replace with your actual client initialization

        embedder = AsyncBatchEmbeddings(
            client=openai_async_client,
            model_name="text-embedding-3-small",
            max_concurrency=8  # Limit concurrent requests
        )
        texts = ["This is the first document.", "This is the second document.", "This is the first document."]

        # Asynchronous call
        async def main():
            embeddings = await embedder.create(texts, batch_size=128)
            # embeddings will be a list of numpy arrays (float32)
            # The embedding for the third text will be identical to the first
            # due to automatic de-duplication.
            print(f"Generated {len(embeddings)} embeddings.")
            print(f"Shape of first embedding: {embeddings[0].shape}")
            assert np.array_equal(embeddings[0], embeddings[2])

        # Run the async function
        asyncio.run(main())
        ```

    Attributes:
        client: An already‑configured ``openai.AsyncOpenAI`` client.
        model_name: The model identifier, e.g. ``"text-embedding-3-small"``.
        max_concurrency: Maximum number of concurrent requests to the OpenAI API.
    """

    client: AsyncOpenAI
    model_name: str
    max_concurrency: int = 8  # Default concurrency limit
    _semaphore: asyncio.Semaphore = field(init=False, repr=False)

    def __post_init__(self):
        # Initialize the semaphore after the object is created
        # Use object.__setattr__ because the dataclass is frozen
        object.__setattr__(self, "_semaphore", asyncio.Semaphore(self.max_concurrency))

    @observe(_LOGGER)
    @backoff_async(exception=RateLimitError, scale=15, max_retries=8)
    async def _embed_chunk(self, inputs: List[str]) -> List[NDArray[np.float32]]:
        """Embed one minibatch of sentences asynchronously, respecting concurrency limits.

        This private helper handles the actual API call for a batch of inputs.
        Exponential back-off is applied automatically when ``openai.RateLimitError``
        is raised.

        Args:
            inputs: Input strings to be embedded. Duplicates are allowed.

        Returns:
            List of embedding vectors (``np.ndarray`` with dtype ``float32``)
            in the same order as *inputs*.

        Raises:
            openai.RateLimitError: Propagated if retries are exhausted.
        """
        # Acquire semaphore before making the API call
        async with self._semaphore:
            responses = await self.client.embeddings.create(input=inputs, model=self.model_name)
            return [np.array(d.embedding, dtype=np.float32) for d in responses.data]

    @observe(_LOGGER)
    async def create(self, inputs: List[str], batch_size: int) -> List[NDArray[np.float32]]:
        """Asynchronous public API: generate embeddings for a list of inputs.

        Uses ``openaivec.aio.map`` to efficiently handle batching and de-duplication.

        Args:
            inputs: A list of input strings. Duplicates are handled efficiently.
            batch_size: Maximum number of unique inputs per API call.

        Returns:
            A list of ``np.ndarray`` objects (dtype ``float32``) where each entry
            is the embedding of the corresponding string in *inputs*.

        Raises:
            openai.RateLimitError: Propagated if retries are exhausted during API calls.
        """

        return await map_async(inputs, self._embed_chunk, batch_size)

create(inputs, batch_size) async

Asynchronous public API: generate embeddings for a list of inputs.

Uses openaivec.aio.map to efficiently handle batching and de-duplication.

Parameters:

Name Type Description Default
inputs List[str]

A list of input strings. Duplicates are handled efficiently.

required
batch_size int

Maximum number of unique inputs per API call.

required

Returns:

Type Description
List[NDArray[float32]]

A list of np.ndarray objects (dtype float32) where each entry

List[NDArray[float32]]

is the embedding of the corresponding string in inputs.

Raises:

Type Description
RateLimitError

Propagated if retries are exhausted during API calls.

Source code in src/openaivec/embeddings.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@observe(_LOGGER)
async def create(self, inputs: List[str], batch_size: int) -> List[NDArray[np.float32]]:
    """Asynchronous public API: generate embeddings for a list of inputs.

    Uses ``openaivec.aio.map`` to efficiently handle batching and de-duplication.

    Args:
        inputs: A list of input strings. Duplicates are handled efficiently.
        batch_size: Maximum number of unique inputs per API call.

    Returns:
        A list of ``np.ndarray`` objects (dtype ``float32``) where each entry
        is the embedding of the corresponding string in *inputs*.

    Raises:
        openai.RateLimitError: Propagated if retries are exhausted during API calls.
    """

    return await map_async(inputs, self._embed_chunk, batch_size)

BatchEmbeddings dataclass

Thin wrapper around the OpenAI /embeddings endpoint.

Attributes:

Name Type Description
client OpenAI

An already‑configured openai.OpenAI client.

model_name str

The model identifier, e.g. "text-embedding-3-small".

Source code in src/openaivec/embeddings.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass(frozen=True)
class BatchEmbeddings:
    """Thin wrapper around the OpenAI /embeddings endpoint.

    Attributes:
        client: An already‑configured ``openai.OpenAI`` client.
        model_name: The model identifier, e.g. ``"text-embedding-3-small"``.
    """

    client: OpenAI
    model_name: str

    @observe(_LOGGER)
    @backoff(exception=RateLimitError, scale=15, max_retries=8)
    def _embed_chunk(self, inputs: List[str]) -> List[NDArray[np.float32]]:
        """Embed one minibatch of sentences.

        This private helper is the unit of work used by the map/parallel
        utilities.  Exponential back‑off is applied automatically when
        ``openai.RateLimitError`` is raised.

        Args:
            inputs: Input strings to be embedded.  Duplicates are allowed; the
                implementation may decide to de‑duplicate internally.

        Returns:
            List of embedding vectors with the same ordering as *sentences*.
        """
        responses = self.client.embeddings.create(input=inputs, model=self.model_name)
        return [np.array(d.embedding, dtype=np.float32) for d in responses.data]

    @observe(_LOGGER)
    def create(self, inputs: List[str], batch_size: int) -> List[NDArray[np.float32]]:
        """See ``VectorizedEmbeddings.create`` for contract details.

        The call is internally delegated to either ``map_unique_minibatch`` or
        its parallel counterpart depending on *is_parallel*.

        Args:
            inputs: A list of input strings. Duplicates are allowed; the
                implementation may decide to de‑duplicate internally.
            batch_size: Maximum number of sentences to be sent to the underlying
                model in one request.

        Returns:
            A list of ``np.ndarray`` objects (dtype ``float32``) where each entry
                is the embedding of the corresponding sentence in *sentences*.

        Raises:
            openai.RateLimitError: Propagated if retries are exhausted.
        """
        return map(inputs, self._embed_chunk, batch_size)

create(inputs, batch_size)

See VectorizedEmbeddings.create for contract details.

The call is internally delegated to either map_unique_minibatch or its parallel counterpart depending on is_parallel.

Parameters:

Name Type Description Default
inputs List[str]

A list of input strings. Duplicates are allowed; the implementation may decide to de‑duplicate internally.

required
batch_size int

Maximum number of sentences to be sent to the underlying model in one request.

required

Returns:

Type Description
List[NDArray[float32]]

A list of np.ndarray objects (dtype float32) where each entry is the embedding of the corresponding sentence in sentences.

Raises:

Type Description
RateLimitError

Propagated if retries are exhausted.

Source code in src/openaivec/embeddings.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@observe(_LOGGER)
def create(self, inputs: List[str], batch_size: int) -> List[NDArray[np.float32]]:
    """See ``VectorizedEmbeddings.create`` for contract details.

    The call is internally delegated to either ``map_unique_minibatch`` or
    its parallel counterpart depending on *is_parallel*.

    Args:
        inputs: A list of input strings. Duplicates are allowed; the
            implementation may decide to de‑duplicate internally.
        batch_size: Maximum number of sentences to be sent to the underlying
            model in one request.

    Returns:
        A list of ``np.ndarray`` objects (dtype ``float32``) where each entry
            is the embedding of the corresponding sentence in *sentences*.

    Raises:
        openai.RateLimitError: Propagated if retries are exhausted.
    """
    return map(inputs, self._embed_chunk, batch_size)