Spark Extension¶
openaivec.spark ¶
Asynchronous Spark UDFs for the OpenAI and Azure OpenAI APIs.
This module provides builder classes (ResponsesUDFBuilder
, EmbeddingsUDFBuilder
)
for creating asynchronous Spark UDFs that communicate with either the public
OpenAI API or Azure OpenAI using the openaivec.spark
subpackage.
It supports UDFs for generating responses and creating embeddings asynchronously.
The UDFs operate on Spark DataFrames and leverage asyncio for potentially
improved performance in I/O-bound operations.
Setup¶
First, obtain a Spark session:
Next, instantiate UDF builders with your OpenAI API key (or Azure credentials) and model/deployment names, then register the desired UDFs:
import os
from openaivec.spark import ResponsesUDFBuilder, EmbeddingsUDFBuilder
from pydantic import BaseModel
# Option 1: Using OpenAI
resp_builder = ResponsesUDFBuilder.of_openai(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-4o-mini", # Model for responses
)
emb_builder = EmbeddingsUDFBuilder.of_openai(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-small", # Model for embeddings
)
# Option 2: Using Azure OpenAI
# resp_builder = ResponsesUDFBuilder.of_azure_openai(
# api_key=os.getenv("AZURE_OPENAI_KEY"),
# endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
# api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
# model_name="your-resp-deployment-name", # Deployment for responses
# )
# emb_builder = EmbeddingsUDFBuilder.of_azure_openai(
# api_key=os.getenv("AZURE_OPENAI_KEY"),
# endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
# api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
# model_name="your-emb-deployment-name", # Deployment for embeddings
# )
# Define a Pydantic model for structured responses (optional)
class Translation(BaseModel):
en: str
fr: str
# ... other languages
# Register the asynchronous responses UDF
spark.udf.register(
"translate_async",
resp_builder.build(
instructions="Translate the text to multiple languages.",
response_format=Translation,
),
)
# Or use a predefined task with build_from_task method
from openaivec.task import nlp
spark.udf.register(
"sentiment_async",
resp_builder.build_from_task(nlp.SENTIMENT_ANALYSIS),
)
# Register the asynchronous embeddings UDF
spark.udf.register(
"embed_async",
emb_builder.build(),
)
You can now invoke the UDFs from Spark SQL:
SELECT
text,
translate_async(text) AS translation,
sentiment_async(text) AS sentiment,
embed_async(text) AS embedding
FROM your_table;
Note: This module provides asynchronous support through the pandas extensions.
ResponsesUDFBuilder
dataclass
¶
Builder for asynchronous Spark pandas UDFs for generating responses.
Configures and builds UDFs that leverage pandas_ext.aio.responses
to generate text or structured responses from OpenAI models asynchronously.
An instance stores authentication parameters and the model name.
This builder supports two main methods:
- build()
: Creates UDFs with custom instructions and response formats
- build_from_task()
: Creates UDFs from predefined tasks (e.g., sentiment analysis)
Attributes:
Name | Type | Description |
---|---|---|
api_key |
str
|
OpenAI or Azure API key. |
endpoint |
Optional[str]
|
Azure endpoint base URL. None for public OpenAI. |
api_version |
Optional[str]
|
Azure API version. Ignored for public OpenAI. |
model_name |
str
|
Deployment name (Azure) or model name (OpenAI) for responses. |
Source code in src/openaivec/spark.py
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 |
|
of_openai
classmethod
¶
Creates a builder configured for the public OpenAI API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_key
|
str
|
The OpenAI API key. |
required |
model_name
|
str
|
The OpenAI model name for responses (e.g., "gpt-4o-mini"). |
required |
Returns:
Name | Type | Description |
---|---|---|
ResponsesUDFBuilder |
ResponsesUDFBuilder
|
A builder instance configured for OpenAI responses. |
Source code in src/openaivec/spark.py
of_azure_openai
classmethod
¶
of_azure_openai(
api_key: str,
endpoint: str,
api_version: str,
model_name: str,
) -> ResponsesUDFBuilder
Creates a builder configured for Azure OpenAI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_key
|
str
|
The Azure OpenAI API key. |
required |
endpoint
|
str
|
The Azure OpenAI endpoint URL. |
required |
api_version
|
str
|
The Azure OpenAI API version (e.g., "2024-02-01"). |
required |
model_name
|
str
|
The Azure OpenAI deployment name for responses. |
required |
Returns:
Name | Type | Description |
---|---|---|
ResponsesUDFBuilder |
ResponsesUDFBuilder
|
A builder instance configured for Azure OpenAI responses. |
Source code in src/openaivec/spark.py
build ¶
build(
instructions: str,
response_format: Type[T] = str,
batch_size: int = 128,
temperature: float = 0.0,
top_p: float = 1.0,
max_concurrency: int = 8,
) -> UserDefinedFunction
Builds the asynchronous pandas UDF for generating responses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
instructions
|
str
|
The system prompt or instructions for the model. |
required |
response_format
|
Type[T]
|
The desired output format. Either |
str
|
batch_size
|
int
|
Number of rows per async batch request passed to the underlying
|
128
|
temperature
|
float
|
Sampling temperature (0.0 to 2.0). Defaults to 0.0. |
0.0
|
top_p
|
float
|
Nucleus sampling parameter. Defaults to 1.0. |
1.0
|
Returns:
Name | Type | Description |
---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF configured to generate responses asynchronously.
Output schema is |
Raises:
Type | Description |
---|---|
ValueError
|
If |
Source code in src/openaivec/spark.py
build_from_task ¶
build_from_task(
task: PreparedTask,
batch_size: int = 128,
max_concurrency: int = 8,
) -> UserDefinedFunction
Builds the asynchronous pandas UDF from a predefined task.
This method allows users to create UDFs from predefined tasks such as sentiment analysis, translation, or other common NLP operations defined in the openaivec.task module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task
|
PreparedTask
|
A predefined task configuration containing instructions, response format, temperature, and top_p settings. |
required |
batch_size
|
int
|
Number of rows per async batch request passed to the underlying
|
128
|
max_concurrency
|
int
|
Maximum number of concurrent requests. Defaults to 8. |
8
|
Returns:
Name | Type | Description |
---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF configured to execute the specified task asynchronously, returning a struct derived from the task's response format. |
Example
Source code in src/openaivec/spark.py
EmbeddingsUDFBuilder
dataclass
¶
Builder for asynchronous Spark pandas UDFs for creating embeddings.
Configures and builds UDFs that leverage pandas_ext.aio.embeddings
to generate vector embeddings from OpenAI models asynchronously.
An instance stores authentication parameters and the model name.
Attributes:
Name | Type | Description |
---|---|---|
api_key |
str
|
OpenAI or Azure API key. |
endpoint |
Optional[str]
|
Azure endpoint base URL. None for public OpenAI. |
api_version |
Optional[str]
|
Azure API version. Ignored for public OpenAI. |
model_name |
str
|
Deployment name (Azure) or model name (OpenAI) for embeddings. |
Source code in src/openaivec/spark.py
of_openai
classmethod
¶
Creates a builder configured for the public OpenAI API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_key
|
str
|
The OpenAI API key. |
required |
model_name
|
str
|
The OpenAI model name for embeddings (e.g., "text-embedding-3-small"). |
required |
Returns:
Name | Type | Description |
---|---|---|
EmbeddingsUDFBuilder |
EmbeddingsUDFBuilder
|
A builder instance configured for OpenAI embeddings. |
Source code in src/openaivec/spark.py
of_azure_openai
classmethod
¶
of_azure_openai(
api_key: str,
endpoint: str,
api_version: str,
model_name: str,
) -> EmbeddingsUDFBuilder
Creates a builder configured for Azure OpenAI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_key
|
str
|
The Azure OpenAI API key. |
required |
endpoint
|
str
|
The Azure OpenAI endpoint URL. |
required |
api_version
|
str
|
The Azure OpenAI API version (e.g., "2024-02-01"). |
required |
model_name
|
str
|
The Azure OpenAI deployment name for embeddings. |
required |
Returns:
Name | Type | Description |
---|---|---|
EmbeddingsUDFBuilder |
EmbeddingsUDFBuilder
|
A builder instance configured for Azure OpenAI embeddings. |
Source code in src/openaivec/spark.py
build ¶
Builds the asynchronous pandas UDF for generating embeddings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size
|
int
|
Number of rows per async batch request passed to the underlying
|
128
|
Returns:
Name | Type | Description |
---|---|---|
UserDefinedFunction |
UserDefinedFunction
|
A Spark pandas UDF configured to generate embeddings asynchronously,
returning an |
Source code in src/openaivec/spark.py
split_to_chunks_udf ¶
Create a pandas‑UDF that splits text into token‑bounded chunks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name
|
str
|
Model identifier passed to tiktoken. |
required |
max_tokens
|
int
|
Maximum tokens allowed per chunk. |
required |
sep
|
List[str]
|
Ordered list of separator strings used by |
required |
Returns:
Type | Description |
---|---|
UserDefinedFunction
|
A pandas UDF producing an |
Source code in src/openaivec/spark.py
count_tokens_udf ¶
Create a pandas‑UDF that counts tokens for every string cell.
The UDF uses tiktoken to approximate tokenisation and caches the
resulting Encoding
object per executor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name
|
str
|
Model identifier understood by |
'gpt-4o'
|
Returns:
Type | Description |
---|---|
UserDefinedFunction
|
A pandas UDF producing an |