Concurrency with Pandas¶
pd.DataFrame.aio
is an accessor that provides an async interface to OpenAI's API. It is designed to be used with Python's asyncio
library, allowing you to make non-blocking concurrent requests to the OpenAI API.
In [1]:
Copied!
import os
from openai import AsyncOpenAI, OpenAI
from openaivec import pandas_ext
import pandas as pd
from pydantic import BaseModel, Field
from enum import Enum
pandas_ext.use(OpenAI())
pandas_ext.use_async(AsyncOpenAI())
pandas_ext.responses_model("gpt-4.1-mini")
pandas_ext.embeddings_model("text-embedding-3-small")
import os
from openai import AsyncOpenAI, OpenAI
from openaivec import pandas_ext
import pandas as pd
from pydantic import BaseModel, Field
from enum import Enum
pandas_ext.use(OpenAI())
pandas_ext.use_async(AsyncOpenAI())
pandas_ext.responses_model("gpt-4.1-mini")
pandas_ext.embeddings_model("text-embedding-3-small")
The model name 'gpt-4.1-mini' is not supported by tiktoken. Instead, using the 'o200k_base' encoding.
In [2]:
Copied!
files_dict = {"path": []}
for root, dirs, files in os.walk('../../src/openaivec'):
for file in files:
path = os.path.join(root, file)
if "__" not in path:
files_dict["path"].append(os.path.join(root, file))
implements_df = pd.DataFrame(files_dict).assign(
module=lambda df: df["path"].str.split("/")
.map(lambda x: x[3:])
.map(lambda x: ".".join(x))
.map(lambda x: x.replace(".py", "")),
)
files_dict = {"path": []}
for root, dirs, files in os.walk('../../src/openaivec'):
for file in files:
path = os.path.join(root, file)
if "__" not in path:
files_dict["path"].append(os.path.join(root, file))
implements_df = pd.DataFrame(files_dict).assign(
module=lambda df: df["path"].str.split("/")
.map(lambda x: x[3:])
.map(lambda x: ".".join(x))
.map(lambda x: x.replace(".py", "")),
)
In [3]:
Copied!
implements_df
implements_df
Out[3]:
path | module | |
---|---|---|
0 | ../../src/openaivec/serialize.py | openaivec.serialize |
1 | ../../src/openaivec/responses.py | openaivec.responses |
2 | ../../src/openaivec/log.py | openaivec.log |
3 | ../../src/openaivec/util.py | openaivec.util |
4 | ../../src/openaivec/embeddings.py | openaivec.embeddings |
5 | ../../src/openaivec/pandas_ext.py | openaivec.pandas_ext |
6 | ../../src/openaivec/spark.py | openaivec.spark |
7 | ../../src/openaivec/prompt.py | openaivec.prompt |
8 | ../../src/openaivec/beta/batch.py | openaivec.beta.batch |
9 | ../../src/openaivec/beta/schema.py | openaivec.beta.schema |
10 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark |
In [4]:
Copied!
class OpjectType(str, Enum):
FUNCTION = "function"
CLASS = "class"
VARIABLE = "variable"
class Question(BaseModel):
question: str = Field(description="The specific question related to the code section.")
answer: str = Field(description="The corresponding answer explaining the code aspect.")
class Section(BaseModel):
name: str = Field(description="The name of the function or class being documented.")
type: OpjectType = Field(description="The type of the code section, either a function or a class.")
description: str = Field(description="A concise summary of the function or class's purpose and functionality.")
questions: list[Question] = Field(description="A list of Q&A pairs clarifying aspects of this code section.")
class Document(BaseModel):
sections: list[Section] = Field(description="A list of sections, each documenting a specific function or class.")
class OpjectType(str, Enum):
FUNCTION = "function"
CLASS = "class"
VARIABLE = "variable"
class Question(BaseModel):
question: str = Field(description="The specific question related to the code section.")
answer: str = Field(description="The corresponding answer explaining the code aspect.")
class Section(BaseModel):
name: str = Field(description="The name of the function or class being documented.")
type: OpjectType = Field(description="The type of the code section, either a function or a class.")
description: str = Field(description="A concise summary of the function or class's purpose and functionality.")
questions: list[Question] = Field(description="A list of Q&A pairs clarifying aspects of this code section.")
class Document(BaseModel):
sections: list[Section] = Field(description="A list of sections, each documenting a specific function or class.")
Note that async methods are not available in lambda functions. We can use aio.assign
instead of assign
to use an async function in a lambda function.
And top-level await is allowed in notebook cells.
In [5]:
Copied!
docs_df = await implements_df.aio.assign(
code=lambda df: df["path"].map(lambda x: open(x).read()),
doc=lambda df: df["code"].aio.responses(
instructions="Document the code in detail, including a summary, Q&A pairs, and explanations.",
response_format=Document,
batch_size=1,
),
)
docs_df = await implements_df.aio.assign(
code=lambda df: df["path"].map(lambda x: open(x).read()),
doc=lambda df: df["code"].aio.responses(
instructions="Document the code in detail, including a summary, Q&A pairs, and explanations.",
response_format=Document,
batch_size=1,
),
)
In [6]:
Copied!
docs_df
docs_df
Out[6]:
path | module | code | doc | |
---|---|---|---|---|
0 | ../../src/openaivec/serialize.py | openaivec.serialize | from enum import Enum\nfrom typing import Any,... | sections=[Section(name='serialize_base_model',... |
1 | ../../src/openaivec/responses.py | openaivec.responses | """Vectorized interaction helpers for OpenAI c... | sections=[Section(name='_vectorize_system_mess... |
2 | ../../src/openaivec/log.py | openaivec.log | import functools\nimport json\nimport time\nim... | sections=[Section(name='observe', type=<Opject... |
3 | ../../src/openaivec/util.py | openaivec.util | import asyncio\nimport functools\nimport re\ni... | sections=[Section(name='get_exponential_with_c... |
4 | ../../src/openaivec/embeddings.py | openaivec.embeddings | """Embedding utilities built on top of OpenAI’... | sections=[Section(name='BatchEmbeddings', type... |
5 | ../../src/openaivec/pandas_ext.py | openaivec.pandas_ext | """Pandas Series / DataFrame extension for Ope... | sections=[Section(name='Module Overview', type... |
6 | ../../src/openaivec/spark.py | openaivec.spark | """Spark UDFs for the OpenAI and Azure OpenAI ... | sections=[Section(name='UDFBuilder', type=<Opj... |
7 | ../../src/openaivec/prompt.py | openaivec.prompt | """\nThis module provides a builder for creati... | sections=[Section(name='Example', type=<Opject... |
8 | ../../src/openaivec/beta/batch.py | openaivec.beta.batch | import json\nfrom dataclasses import dataclass... | sections=[Section(name='JsonlUDFBuilder', type... |
9 | ../../src/openaivec/beta/schema.py | openaivec.beta.schema | from pydantic import BaseModel\n\n__all__ = [\... | sections=[Section(name='Entity', type=<OpjectT... |
10 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark | """Asynchronous Spark UDFs for the OpenAI and ... | sections=[Section(name='_initialize', type=<Op... |
In [7]:
Copied!
questions_df = await docs_df.aio.pipe(
lambda df: df
.drop(columns=["code"])
.ai.extract("doc")
.explode("doc_sections")
.ai.extract("doc_sections")
.explode("doc_sections_questions")
.ai.extract("doc_sections_questions")
.reset_index(drop=True)
.aio.assign(
doc_sections_type=lambda df: df.doc_sections_type.map(lambda x: x.value),
embedding=lambda df: df["doc_sections_questions_question"].aio.embeddings()
)
)
questions_df = await docs_df.aio.pipe(
lambda df: df
.drop(columns=["code"])
.ai.extract("doc")
.explode("doc_sections")
.ai.extract("doc_sections")
.explode("doc_sections_questions")
.ai.extract("doc_sections_questions")
.reset_index(drop=True)
.aio.assign(
doc_sections_type=lambda df: df.doc_sections_type.map(lambda x: x.value),
embedding=lambda df: df["doc_sections_questions_question"].aio.embeddings()
)
)
In [8]:
Copied!
questions_df
questions_df
Out[8]:
path | module | doc_sections_name | doc_sections_type | doc_sections_description | doc_sections_questions_question | doc_sections_questions_answer | embedding | |
---|---|---|---|---|---|---|---|---|
0 | ../../src/openaivec/serialize.py | openaivec.serialize | serialize_base_model | function | This function takes a Pydantic BaseModel class... | What does serialize_base_model expect as input? | It expects a Pydantic BaseModel class (a type)... | [0.0102652, 0.039048016, 0.01654257, -0.025273... |
1 | ../../src/openaivec/serialize.py | openaivec.serialize | serialize_base_model | function | This function takes a Pydantic BaseModel class... | What does serialize_base_model return? | It returns a dictionary representing the JSON ... | [-0.0023623717, 0.02963441, 0.019845841, -0.02... |
2 | ../../src/openaivec/serialize.py | openaivec.serialize | serialize_base_model | function | This function takes a Pydantic BaseModel class... | Which Pydantic method is used to generate the ... | The method model_json_schema() of the BaseMode... | [0.01775365, -0.011621917, 0.043650042, -0.030... |
3 | ../../src/openaivec/serialize.py | openaivec.serialize | dereference_json_schema | function | This function takes a JSON schema dictionary t... | What is the purpose of dereference_json_schema? | To replace all $ref references in a JSON schem... | [0.04460072, 0.022822931, 0.021498278, -0.0036... |
4 | ../../src/openaivec/serialize.py | openaivec.serialize | dereference_json_schema | function | This function takes a JSON schema dictionary t... | How does the function handle nested references? | It recursively dereferences any nested $ref fo... | [-0.0071634245, 0.013306478, 0.012910823, -0.0... |
... | ... | ... | ... | ... | ... | ... | ... | ... |
120 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark | ResponsesUDFBuilder | class | A builder class for creating asynchronous Spar... | What happens if an unsupported response_format... | The build method raises a ValueError indicatin... | [-0.026541049, -0.0029880104, 0.08570135, -0.0... |
121 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark | EmbeddingsUDFBuilder | class | A builder class for creating asynchronous Spar... | What is the purpose of EmbeddingsUDFBuilder? | It builds asynchronous Spark pandas UDFs to ge... | [-0.043919045, -0.004853987, 0.04757695, 0.012... |
122 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark | EmbeddingsUDFBuilder | class | A builder class for creating asynchronous Spar... | How do you create an EmbeddingsUDFBuilder for ... | Use the class method of_openai with the API ke... | [-0.03296745, -0.017957475, 0.06286354, 0.0066... |
123 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark | EmbeddingsUDFBuilder | class | A builder class for creating asynchronous Spar... | How do you create an EmbeddingsUDFBuilder for ... | Use the class method of_azure_openai with the ... | [-0.044121046, -0.021955194, 0.0666614, 0.0043... |
124 | ../../src/openaivec/aio/spark.py | openaivec.aio.spark | EmbeddingsUDFBuilder | class | A builder class for creating asynchronous Spar... | What does the build method return? | A Spark pandas UDF that asynchronously generat... | [-0.0103505375, 0.024601478, 0.041760493, -0.0... |
125 rows × 8 columns