Skip to content

openaivec.util

TextChunker dataclass

Utility for splitting text into token‑bounded chunks.

Source code in src/openaivec/util.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@dataclass(frozen=True)
class TextChunker:
    """Utility for splitting text into token‑bounded chunks."""

    enc: tiktoken.Encoding

    def split(self, original: str, max_tokens: int, sep: List[str]) -> List[str]:
        """Token‑aware sentence segmentation.

        The text is first split by the given separators, then greedily packed
        into chunks whose token counts do not exceed ``max_tokens``.

        Args:
            original (str): Original text to split.
            max_tokens (int): Maximum number of tokens allowed per chunk.
            sep (List[str]): List of separator patterns used by
                :pyfunc:`re.split`.

        Returns:
            List[str]: List of text chunks respecting the ``max_tokens`` limit.
        """
        sentences = re.split(f"({'|'.join(sep)})", original)
        sentences = [s.strip() for s in sentences if s.strip()]
        sentences = [(s, len(self.enc.encode(s))) for s in sentences]

        chunks = []
        sentence = ""
        token_count = 0
        for s, n in sentences:
            if token_count + n > max_tokens:
                if sentence:
                    chunks.append(sentence)
                sentence = ""
                token_count = 0

            sentence += s
            token_count += n

        if sentence:
            chunks.append(sentence)

        return chunks

split(original, max_tokens, sep)

Token‑aware sentence segmentation.

The text is first split by the given separators, then greedily packed into chunks whose token counts do not exceed max_tokens.

Parameters:

Name Type Description Default
original str

Original text to split.

required
max_tokens int

Maximum number of tokens allowed per chunk.

required
sep List[str]

List of separator patterns used by :pyfunc:re.split.

required

Returns:

Type Description
List[str]

List[str]: List of text chunks respecting the max_tokens limit.

Source code in src/openaivec/util.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def split(self, original: str, max_tokens: int, sep: List[str]) -> List[str]:
    """Token‑aware sentence segmentation.

    The text is first split by the given separators, then greedily packed
    into chunks whose token counts do not exceed ``max_tokens``.

    Args:
        original (str): Original text to split.
        max_tokens (int): Maximum number of tokens allowed per chunk.
        sep (List[str]): List of separator patterns used by
            :pyfunc:`re.split`.

    Returns:
        List[str]: List of text chunks respecting the ``max_tokens`` limit.
    """
    sentences = re.split(f"({'|'.join(sep)})", original)
    sentences = [s.strip() for s in sentences if s.strip()]
    sentences = [(s, len(self.enc.encode(s))) for s in sentences]

    chunks = []
    sentence = ""
    token_count = 0
    for s, n in sentences:
        if token_count + n > max_tokens:
            if sentence:
                chunks.append(sentence)
            sentence = ""
            token_count = 0

        sentence += s
        token_count += n

    if sentence:
        chunks.append(sentence)

    return chunks

backoff(exception, scale=None, max_retries=None)

Decorator implementing exponential back‑off retry logic.

Parameters:

Name Type Description Default
exception Exception

Exception type that triggers a retry.

required
scale int | None

Initial scale parameter for the exponential jitter. This scale is used as the mean for the first delay's exponential distribution and doubles with each subsequent retry. If None, an initial scale of 1.0 is used.

None
max_retries Optional[int]

Maximum number of retries. None means retry indefinitely.

None

Returns:

Type Description
Callable[..., V]

Callable[..., V]: A decorated function that retries on the specified exception with exponential back‑off.

Raises:

Type Description
exception

Re‑raised when the maximum number of retries is exceeded.

Source code in src/openaivec/util.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def backoff(exception: Exception, scale: int | None = None, max_retries: int | None = None) -> Callable[..., V]:
    """Decorator implementing exponential back‑off retry logic.

    Args:
        exception (Exception): Exception type that triggers a retry.
        scale (int | None): Initial scale parameter for the exponential jitter.
            This scale is used as the mean for the first delay's exponential
            distribution and doubles with each subsequent retry. If ``None``,
            an initial scale of 1.0 is used.
        max_retries (Optional[int]): Maximum number of retries. ``None`` means
            retry indefinitely.

    Returns:
        Callable[..., V]: A decorated function that retries on the specified
            exception with exponential back‑off.

    Raises:
        exception: Re‑raised when the maximum number of retries is exceeded.
    """

    def decorator(func: Callable[..., V]) -> Callable[..., V]:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> V:
            attempt = 0
            # Initialize the scale for the exponential backoff. This scale will double with each retry.
            # If the input 'scale' is None, default to 1.0. This 'scale' is the mean of the exponential distribution.
            current_jitter_scale = float(scale) if scale is not None else 1.0

            while True:
                try:
                    return func(*args, **kwargs)
                except exception:
                    attempt += 1
                    if max_retries is not None and attempt >= max_retries:
                        raise

                    # Get the sleep interval with exponential jitter, using the current scale
                    interval = get_exponential_with_cutoff(current_jitter_scale)
                    time.sleep(interval)

                    # Double the scale for the next potential retry
                    current_jitter_scale *= 2

        return wrapper

    return decorator

backoff_async(exception, scale=None, max_retries=None)

Asynchronous version of the backoff decorator.

Parameters:

Name Type Description Default
exception Exception

Exception type that triggers a retry.

required
scale int | None

Initial scale parameter for the exponential jitter. This scale is used as the mean for the first delay's exponential distribution and doubles with each subsequent retry. If None, an initial scale of 1.0 is used.

None
max_retries int | None

Maximum number of retries. None means retry indefinitely.

None

Returns:

Type Description
Callable[..., Awaitable[V]]

Callable[..., Awaitable[V]]: A decorated asynchronous function that retries on the specified exception with exponential back‑off.

Raises:

Type Description
exception

Re‑raised when the maximum number of retries is exceeded.

Source code in src/openaivec/util.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def backoff_async(
    exception: Exception, scale: int | None = None, max_retries: int | None = None
) -> Callable[..., Awaitable[V]]:
    """Asynchronous version of the backoff decorator.

    Args:
        exception (Exception): Exception type that triggers a retry.
        scale (int | None): Initial scale parameter for the exponential jitter.
            This scale is used as the mean for the first delay's exponential
            distribution and doubles with each subsequent retry. If ``None``,
            an initial scale of 1.0 is used.
        max_retries (int | None): Maximum number of retries. ``None`` means
            retry indefinitely.

    Returns:
        Callable[..., Awaitable[V]]: A decorated asynchronous function that
            retries on the specified exception with exponential back‑off.

    Raises:
        exception: Re‑raised when the maximum number of retries is exceeded.
    """

    def decorator(func: Callable[..., Awaitable[V]]) -> Callable[..., Awaitable[V]]:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> V:
            attempt = 0
            # Initialize the scale for the exponential backoff. This scale will double with each retry.
            # If the input 'scale' is None, default to 1.0. This 'scale' is the mean of the exponential distribution.
            current_jitter_scale = float(scale) if scale is not None else 1.0

            while True:
                try:
                    return await func(*args, **kwargs)
                except exception:
                    attempt += 1
                    if max_retries is not None and attempt >= max_retries:
                        raise

                    # Get the sleep interval with exponential jitter, using the current scale
                    interval = get_exponential_with_cutoff(current_jitter_scale)
                    await asyncio.sleep(interval)

                    # Double the scale for the next potential retry
                    current_jitter_scale *= 2

        return wrapper

    return decorator

get_exponential_with_cutoff(scale)

Sample an exponential random variable with an upper cutoff.

A value is repeatedly drawn from an exponential distribution with rate 1/scale until it is smaller than 3 * scale.

Parameters:

Name Type Description Default
scale float

Scale parameter of the exponential distribution.

required

Returns:

Name Type Description
float float

Sampled value bounded by 3 * scale.

Source code in src/openaivec/util.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def get_exponential_with_cutoff(scale: float) -> float:
    """Sample an exponential random variable with an upper cutoff.

    A value is repeatedly drawn from an exponential distribution with rate
    ``1/scale`` until it is smaller than ``3 * scale``.

    Args:
        scale (float): Scale parameter of the exponential distribution.

    Returns:
        float: Sampled value bounded by ``3 * scale``.
    """
    gen = np.random.default_rng()

    while True:
        v = gen.exponential(scale)
        if v < scale * 3:
            return v

map(inputs, f, batch_size=128)

Map a function f over a list of inputs in batches.

This function divides the input list into smaller batches and applies the function f to each batch. It gathers the results and returns them in the same order as the original inputs.

Parameters:

Name Type Description Default
inputs List[T]

List of inputs to be processed.

required
f Callable[[List[T]], List[U]]

Function to apply. It takes a batch of inputs (List[T]) and must return a list of corresponding outputs (List[U]) of the same size.

required
batch_size int

Size of each batch for processing.

128

Returns:

Type Description
List[U]

List[U]: List of outputs corresponding to the original inputs, in order.

Source code in src/openaivec/util.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def map(inputs: List[T], f: Callable[[List[T]], List[U]], batch_size: int = 128) -> List[U]:
    """Map a function `f` over a list of inputs in batches.

    This function divides the input list into smaller batches and applies the
    function `f` to each batch. It gathers the results and returns them in the
    same order as the original inputs.

    Args:
        inputs (List[T]): List of inputs to be processed.
        f (Callable[[List[T]], List[U]]): Function to apply. It takes a batch of
            inputs (List[T]) and must return a list of corresponding outputs
            (List[U]) of the same size.
        batch_size (int): Size of each batch for processing.

    Returns:
        List[U]: List of outputs corresponding to the original inputs, in order.
    """
    original_hashes: List[int] = [hash(str(v)) for v in inputs]  # Use str(v) for hash if T is not hashable
    hash_inputs: Dict[int, T] = {k: v for k, v in zip(original_hashes, inputs)}
    unique_hashes: List[int] = list(hash_inputs.keys())
    unique_inputs: List[T] = list(hash_inputs.values())
    input_batches: List[List[T]] = [unique_inputs[i : i + batch_size] for i in range(0, len(unique_inputs), batch_size)]
    output_batches: List[List[U]] = [f(batch) for batch in input_batches]
    unique_outputs: List[U] = [u for batch in output_batches for u in batch]
    if len(unique_hashes) != len(unique_outputs):
        raise ValueError(
            f"Number of unique inputs ({len(unique_hashes)}) does not match number of unique outputs ({len(unique_outputs)}). Check the function f."
        )
    hash_outputs: Dict[int, U] = {k: v for k, v in zip(unique_hashes, unique_outputs)}
    outputs: List[U] = [hash_outputs[k] for k in original_hashes]
    return outputs

map_async(inputs, f, batch_size=128) async

Asynchronously map a function f over a list of inputs in batches.

This function divides the input list into smaller batches and applies the asynchronous function f to each batch concurrently. It gathers the results and returns them in the same order as the original inputs.

Parameters:

Name Type Description Default
inputs List[T]

List of inputs to be processed.

required
f Callable[[List[T]], Awaitable[List[U]]]

Asynchronous function to apply. It takes a batch of inputs (List[T]) and must return a list of corresponding outputs (List[U]) of the same size.

required
batch_size int

Size of each batch for processing.

128

Returns:

Type Description
List[U]

List[U]: List of outputs corresponding to the original inputs, in order.

Source code in src/openaivec/util.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
async def map_async(inputs: List[T], f: Callable[[List[T]], Awaitable[List[U]]], batch_size: int = 128) -> List[U]:
    """Asynchronously map a function `f` over a list of inputs in batches.

    This function divides the input list into smaller batches and applies the
    asynchronous function `f` to each batch concurrently. It gathers the results
    and returns them in the same order as the original inputs.

    Args:
        inputs (List[T]): List of inputs to be processed.
        f (Callable[[List[T]], Awaitable[List[U]]]): Asynchronous function to apply.
            It takes a batch of inputs (List[T]) and must return a list of
            corresponding outputs (List[U]) of the same size.
        batch_size (int): Size of each batch for processing.

    Returns:
        List[U]: List of outputs corresponding to the original inputs, in order.
    """
    original_hashes: List[int] = [hash(str(v)) for v in inputs]  # Use str(v) for hash if T is not hashable
    hash_inputs: Dict[int, T] = {k: v for k, v in zip(original_hashes, inputs)}
    unique_hashes: List[int] = list(hash_inputs.keys())
    unique_inputs: List[T] = list(hash_inputs.values())
    input_batches: List[List[T]] = [unique_inputs[i : i + batch_size] for i in range(0, len(unique_inputs), batch_size)]
    # Ensure f is awaited correctly within gather
    tasks = [f(batch) for batch in input_batches]
    output_batches: List[List[U]] = await asyncio.gather(*tasks)
    unique_outputs: List[U] = [u for batch in output_batches for u in batch]
    if len(unique_hashes) != len(unique_outputs):
        raise ValueError(
            f"Number of unique inputs ({len(unique_hashes)}) does not match number of unique outputs ({len(unique_outputs)}). Check the function f."
        )
    hash_outputs: Dict[int, U] = {k: v for k, v in zip(unique_hashes, unique_outputs)}
    outputs: List[U] = [hash_outputs[k] for k in original_hashes]
    return outputs