核心模块(Modules)
数据连接(DataConnection)
文档加载器(DocumentLoaders)
自定义(Custom)

自定义文档加载器

概述

基于LLMs的应用程序通常涉及从数据库或文件(如PDF)中提取数据,并将其转换为LLMs可以利用的格式。在LangChain中,这通常涉及创建文档对象,该对象封装了提取的文本(page_content)以及包含有关文档详细信息的元数据 - 一个包含有关文档的作者姓名或发布日期等详细信息的字典。

Document对象通常被格式化为提示,然后被馈送到LLM中,允许LLM使用Document中的信息生成所需的响应(例如,对文档进行总结)。Documents可以立即使用,也可以索引到矢量存储以供将来检索和使用。

文档加载的主要抽象为:

组件描述
Document包含textmetadata
BaseLoader用于将原始数据转换为Documents
Blob二进制数据的表示,可以位于文件或内存中
BaseBlobParser逻辑解析Blob以产生Document对象

本指南将演示如何编写自定义文档加载和文件解析逻辑;具体来说,我们将看到如何:

  1. 通过从BaseLoader继承而创建标准文档加载器。
  2. 使用BaseBlobParser创建解析器,并将其与BlobBlobLoaders一起使用。这在处理文件时非常有用。

标准文档加载器

文档加载器可通过从BaseLoader继承实现,后者提供了加载文档的标准接口。

接口

方法名解释
lazy_load用于惰性逐个加载文档。用于生产代码。
alazy_loadlazy_load的异步变体
load用于急性加载所有文档到内存中。用于原型设计或交互工作。
aload用于急性加载所有文档到内存中。用于原型设计或交互工作。 添加于LangChain的2024年04月
  • load方法仅适用于原型设计工作 - 它只调用list(self.lazy_load())
  • alazy_load有一个默认实现,会委托给lazy_load。如果您使用异步操作,建议重写默认实现并提供本机异步实现。

::: ⚠⚠⚠ 在实现文档加载器时,不要通过lazy_loadalazy_load方法传递参数。 预期所有配置通过初始化程序(init)传递。这是LangChain的设计选择,以确保一旦实例化文档加载器,就会具有加载文档所需的所有信息。 :::

实现

让我们创建一个标准文档加载器的示例,该加载器会读取文件,并为文件中的每一行创建一个文档。

from typing import AsyncIterator, Iterator
 
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
 
 
class CustomDocumentLoader(BaseLoader):
    """读取文件的示例文档加载器,每行创建一个文档。"""
 
    def __init__(self, file_path: str) -> None:
        """使用文件路径初始化加载器。
 
        Args:
            file_path: 要加载的文件的路径。
        """
        self.file_path = file_path
 
    def lazy_load(self) -> Iterator[Document]:  # <-- 不带任何参数
        """逐行读取文件的懒加载器。
 
        在实现惰性加载方法时,应使用生成器逐个生成文档。
        """
        with open(self.file_path, encoding="utf-8") as f:
            line_number = 0
            for line in f:
                yield Document(
                    page_content=line,
                    metadata={"line_number": line_number, "source": self.file_path},
                )
                line_number += 1
 
    # alazy_load是可选的。
    # 如果省略实现,则将使用一个委托给lazy_load的默认实现!
    async def alazy_load(
        self,
    ) -> AsyncIterator[Document]:  # <-- 不带任何参数
        """逐行读取文件的异步懒加载器。"""
        # 需要aiofiles
        # 使用 `pip install aiofiles` 安装
        # https://github.com/Tinche/aiofiles
        import aiofiles
 
        async with aiofiles.open(self.file_path, encoding="utf-8") as f:
            line_number = 0
            async for line in f:
                yield Document(
                    page_content=line,
                    metadata={"line_number": line_number, "source": self.file_path},
                )
                line_number += 1

测试 🧪

要测试文档加载器,我们需要一个包含一些优质内容的文件。

with open("./meow.txt", "w", encoding="utf-8") as f:
    quality_content = "meow meow🐱 \n meow meow🐱 \n meow😻😻"
    f.write(quality_content)
 
loader = CustomDocumentLoader("./meow.txt")
## 测试懒加载接口
for doc in loader.lazy_load():
    print()
    print(type(doc))
    print(doc)
<class 'langchain_core.documents.base.Document'>
page_content='meow meow🐱 \n' metadata={'line_number': 0, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow meow🐱 \n' metadata={'line_number': 1, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow😻😻' metadata={'line_number': 2, 'source': './meow.txt'}
## 测试异步实现
async for doc in loader.alazy_load():
    print()
    print(type(doc))
    print(doc)
<class 'langchain_core.documents.base.Document'>
page_content='meow meow🐱 \n' metadata={'line_number': 0, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow meow🐱 \n' metadata={'line_number': 1, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow😻😻' metadata={'line_number': 2, 'source': './meow.txt'}

::: {.callout-tip}

load()在诸如jupyter笔记本之类的交互环境中很有用。

避免用于生产代码,因为急性加载假定所有内容都可以适合内存,而在某些情况下(特别是企业数据),情况并非总是如此。 :::

loader.load()
[Document(page_content='meow meow🐱 \n', metadata={'line_number': 0, 'source': './meow.txt'}),
 Document(page_content=' meow meow🐱 \n', metadata={'line_number': 1, 'source': './meow.txt'}),
 Document(page_content=' meow😻😻', metadata={'line_number': 2, 'source': './meow.txt'})]

使用文件

许多文档加载器涉及解析文件。这些加载器之间的区别通常在于如何解析文件而不是如何加载文件。例如,您可以使用open读取PDF文件或标记文件的二进制内容,但您需要不同的解析逻辑将二进制数据转换为文本。

因此,将解析逻辑与加载逻辑分离可能非常有用,这样一来,无论数据如何加载,都可以更容易地重用给定的解析器。

BaseBlobParser

BaseBlobParser是一个接口,接受一个blob并输出一个Document对象列表。blob是一个表示数据的实体,可以在内存中或文件中存在。LangChain python具有Blob原语,受Blob WebAPI规范 (opens in a new tab)的启发。

from langchain_core.document_loaders import BaseBlobParser, Blob
 
class MyParser(BaseBlobParser):
    """一个将每行转换为文档的简单解析器。"""
 
    def lazy_parse(self, blob: Blob) -> Iterator[Document]:
        """将一个blob逐行解析为文档。"""
        line_number = 0
        with blob.as_bytes_io() as f:
            for line in f:
                line_number += 1
                yield Document(
                    page_content=line,
                    metadata={"line_number": line_number, "source": blob.source},
                )

使用blob API还允许从内存直接加载内容,而无需从文件中读取!

blob = Blob.from_path("./meow.txt")
parser = MyParser()
list(parser.lazy_parse(blob))
[Document(page_content='meow meow🐱 \n', metadata={'line_number': 1, 'source': './meow.txt'}),
 Document(page_content=' meow meow🐱 \n', metadata={'line_number': 2, 'source': './meow.txt'}),
 Document(page_content=' meow😻😻', metadata={'line_number': 3, 'source': './meow.txt'})]

使用blob API还可以直接从内存加载内容,而无需从文件中读取!

blob = Blob(data=b"some data from memory\nmeow")
list(parser.lazy_parse(blob))
[Document(page_content='some data from memory\n', metadata={'line_number': 1, 'source': None}),
 Document(page_content='meow', metadata={'line_number': 2, 'source': None})=======

            

Blob

让我们简要地浏览一下 Blob API 的一些内容。

blob = Blob.from_path("./meow.txt", metadata={"foo": "bar"})
blob.encoding

'utf-8'

blob.as_bytes()

b'meow meow\xf0\x9f\x90\xb1 \n meow meow\xf0\x9f\x90\xb1 \n meow\xf0\x9f\x98\xbb\xf0\x9f\x98\xbb'

blob.as_string()

'meow meow🐱 \n meow meow🐱 \n meow😻😻'

blob.as_bytes_io()

<contextlib._GeneratorContextManager at 0x743f34324450>

blob.metadata

{'foo': 'bar'}

blob.source

'./meow.txt'

Blob Loaders

虽然解析器封装了将二进制数据解析为文档所需的逻辑,但是 blob loaders 封装了从给定存储位置加载 blob 所需的逻辑。

目前, LangChain 仅支持 FileSystemBlobLoader

您可以使用 FileSystemBlobLoader 来加载 blob,然后使用解析器对其进行解析。

from langchain_community.document_loaders.blob_loaders import FileSystemBlobLoader
 
blob_loader = FileSystemBlobLoader(path=".", glob="*.mdx", show_progress=True)
parser = MyParser()
for blob in blob_loader.yield_blobs():
    for doc in parser.lazy_parse(blob):
        print(doc)
        break
0%|          | 0/8 [00:00<?, ?it/s]

page_content='# Microsoft Office\n' metadata={'line_number': 1, 'source': 'office_file.mdx'}
page_content='# Markdown\n' metadata={'line_number': 1, 'source': 'markdown.mdx'}
page_content='# JSON\n' metadata={'line_number': 1, 'source': 'json.mdx'}
page_content='---\n' metadata={'line_number': 1, 'source': 'pdf.mdx'}
page_content='---\n' metadata={'line_number': 1, 'source': 'index.mdx'}
page_content='# File Directory\n' metadata={'line_number': 1, 'source': 'file_directory.mdx'}
page_content='# CSV\n' metadata={'line_number': 1, 'source': 'csv.mdx'}
page_content='# HTML\n' metadata={'line_number': 1, 'source': 'html.mdx'}

Generic Loader

LangChain 在 GenericLoader 中组合了 BlobLoaderBaseBlobParser 的抽象。

GenericLoader 旨在提供标准化的类方法,使使用现有的 BlobLoader 实现变得容易。目前,只支持 FileSystemBlobLoader

from langchain_community.document_loaders.generic import GenericLoader
 
loader = GenericLoader.from_filesystem(
    path=".", glob="*.mdx", show_progress=True, parser=MyParser()
)
 
for idx, doc in enumerate(loader.lazy_load()):
    if idx < 5:
        print(doc)
 
print("... output truncated for demo purposes")
0%|          | 0/8 [00:00<?, ?it/s]

page_content='# Microsoft Office\n' metadata={'line_number': 1, 'source': 'office_file.mdx'}
page_content='\n' metadata={'line_number': 2, 'source': 'office_file.mdx'}
page_content='>[The Microsoft Office](https://www.office.com/) suite of productivity software includes Microsoft Word, Microsoft Excel, Microsoft PowerPoint, Microsoft Outlook, and Microsoft OneNote. It is available for Microsoft Windows and macOS operating systems. It is also available on Android and iOS.\n' metadata={'line_number': 3, 'source': 'office_file.mdx'}
page_content='\n' metadata={'line_number': 4, 'source': 'office_file.mdx'}
page_content='This covers how to load commonly used file formats including `DOCX`, `XLSX` and `PPTX` documents into a document format that we can use downstream.\n' metadata={'line_number': 5, 'source': 'office_file.mdx'}
... output truncated for demo purposes

Custom Generic Loader

如果您喜欢创建类,可以创建一个类来封装一起的逻辑。

您可以从这个类继承以使用现有的加载器加载内容。

from typing import Any
 
 
class MyCustomLoader(GenericLoader):
    @staticmethod
    def get_parser(**kwargs: Any) -> BaseBlobParser:
        """Override this method to associate a default parser with the class."""
        return MyParser()
loader = MyCustomLoader.from_filesystem(path=".", glob="*.mdx", show_progress=True)
 
for idx, doc in enumerate(loader.lazy_load()):
    if idx < 5:
        print(doc)
 
print("... output truncated for demo purposes")
0%|          | 0/8 [00:00<?, ?it/s]

page_content='# Microsoft Office\n' metadata={'line_number': 1, 'source': 'office_file.mdx'}
page_content='\n' metadata={'line_number': 2, 'source': 'office_file.mdx'}
page_content='>[The Microsoft Office](https://www.office.com/) suite of productivity software includes Microsoft Word, Microsoft Excel, Microsoft PowerPoint, Microsoft Outlook, and Microsoft OneNote. It is available for Microsoft Windows and macOS operating systems. It is also available on Android and iOS.\n' metadata={'line_number': 3, 'source': 'office_file.mdx'}
page_content='\n' metadata={'line_number': 4, 'source': 'office_file.mdx'}
page_content='This covers how to load commonly used file formats including `DOCX`, `XLSX` and `PPTX` documents into a document format that we can use downstream.\n' metadata={'line_number': 5, 'source': 'office_file.mdx'}
... output truncated for demo purposes# 自定义输出解析器

在某些情况下,您可能希望实现一个自定义解析器,以将模型输出结构化为自定义格式。

实现自定义解析器有两种方式:

  1. 在 LCEL 中使用 RunnableLambdaRunnableGenerator -- 我们强烈推荐在大多数情况下使用这种方式
  2. 通过继承基类之一来实现解析器 -- 这是一种比较困难的方式

这两种方式的区别主要在于触发的回调方式(例如 on_chain_starton_parser_start),以及可追踪平台中像 LangSmith 这样的可运行 lambda 与解析器的可视化方式。

可运行的 Lambda 和生成器

推荐的解析方式是使用 可运行的 lambda可运行的生成器

这里,我们将创建一个简单的解析器,将模型输出的大小写颠倒。

例如,如果模型输出为:“喵”,则解析器将生成“mEOW”。

from typing import Iterable
 
from langchain_anthropic.chat_models import ChatAnthropic
from langchain_core.messages import AIMessage, AIMessageChunk
 
model = ChatAnthropic(model_name="claude-2.1")
 
def parse(ai_message: AIMessage) -> str:
    """解析 AI 消息。"""
    return ai_message.content.swapcase()
 
chain = model | parse
chain.invoke("hello")
'hELLO!'

:::⚠⚠⚠

LCEL 在使用 | 语法进行组合时,会自动将函数 parse 升级为 RunnableLambda(parse)

如果你不喜欢这种方式,你可以手动导入 RunnableLambda,然后运行 parse = RunnableLambda(parse)。 :::

流式工作吗?

for chunk in chain.stream("tell me about yourself in one sentence"):
    print(chunk, end="|", flush=True)
i'M cLAUDE, AN ai ASSISTANT CREATED BY aNTHROPIC TO BE HELPFUL, HARMLESS, AND HONEST!|

不行,因为解析器会在解析输出之前聚合输入。

如果我们想要实现流式解析器,可以使解析器接受输入的可迭代对象,并在结果可用时生成结果。

from langchain_core.runnables import RunnableGenerator
 
def streaming_parse(chunks: Iterable[AIMessageChunk]) -> Iterable[str]:
    for chunk in chunks:
        yield chunk.content.swapcase()
 
streaming_parse = RunnableGenerator(streaming_parse)

:::⚠⚠⚠

请将流式解析器包装在 RunnableGenerator 中,因为我们可能不再自动使用 | 语法升级它。 :::

chain = model | streaming_parse
chain.invoke("hello")
'hELLO!'

让我们确认一下流式工作!

for chunk in chain.stream("tell me about yourself in one sentence"):
        print(chunk, end="|", flush=True)
i|'M| cLAUDE|,| AN| ai| ASSISTANT| CREATED| BY| aN|THROP|IC| TO| BE| HELPFUL|,| HARMLESS|,| AND| HONEST|.!|

继承解析基类

另一种实现解析器的方法是通过继承 BaseOutputParserBaseGenerationOutputParser 或其他一个基本解析器中的类,具体取决于您需要做什么。

一般来说,我们不建议使用这种方式,因为这样会导致更多需要编写的代码,而且并没有显著的好处。

最简单的输出解析器扩展了 BaseOutputParser 类,并且必须实现以下方法:

  • parse:接收模型的字符串输出并对其进行解析
  • (可选)_type:标识解析器的名称

当聊天模型或 LLM 的输出格式不正确时,可以抛出 OutputParserException 来指示解析失败是由于错误的输入。使用此异常可以让利用解析器的代码以一致的方式处理异常。

:::⚠⚠⚠

解析器也是可运行的!🏃

由于 BaseOutputParser 实现了 Runnable 接口,您通过这种方式创建的任何自定义解析器都将成为有效的 LangChain 可运行对象,并且将从自动异步支持、批处理接口、日志支持等中受益。 :::

简单解析器

这是一个简单的解析器,可以解析表示布尔值的 字符串(例如 YESNO)并将其转换为相应的 boolean 类型。

from langchain_core.exceptions import OutputParserException
from langchain_core.output_parsers import BaseOutputParser
 
# [bool] 描述了一个泛型的参数化。
# 这基本上表示 parse 的返回类型,即 True 或 False
class BooleanOutputParser(BaseOutputParser[bool]):
    """自定义布尔解析器。"""
 
    true_val: str = "YES"
    false_val: str = "NO"
 
    def parse(self, text: str) -> bool:
        cleaned_text = text.strip().upper()
        if cleaned_text not in (self.true_val.upper(), self.false_val.upper()):
            raise OutputParserException(
                f"BooleanOutputParser 期望输出值为 "
                f"{self.true_val}{self.false_val}(不区分大小写)。 "
                f"接收到 {cleaned_text}。"
            )
        return cleaned_text == self.true_val.upper()
 
    @property
    def _type(self) -> str:
        return "boolean_output_parser"
parser = BooleanOutputParser()
parser.invoke("YES")
True
try:
    parser.invoke("MEOW")
except Exception as e:
    print(f"触发了 {type(e)} 类型的异常")
触发了 <class 'langchain_core.exceptions.OutputParserException'> 类型的异常

让我们测试一下更改参数化是否有效

parser = BooleanOutputParser(true_val="OKAY")
parser.invoke("OKAY")
True

让我们确认其他 LCEL 方法是否存在

parser.batch(["OKAY", "NO"])
[True, False]
await parser.abatch(["OKAY", "NO"])
[True, False]
from langchain_anthropic.chat_models import ChatAnthropic
 
anthropic = ChatAnthropic(model_name="claude-2.1")
anthropic.invoke("say OKAY or NO")
AIMessage(content='OKAY')

让我们测试一下我们的解析器是否能正常工作!

chain = anthropic | parser
chain.invoke("say OKAY or NO")
True

:::{.callout-note} 解析器可以处理 LLM 的输出(字符串)或聊天模型的输出(AIMessage)! :::

解析原始模型输出

有时除了原始文本之外,还有一些重要的模型输出元数据。一个例子是工具调用,其中用于传递给调用函数的参数被返回到一个单独的属性中。如果您需要这种更精细的控制,您可以选择继承BaseGenerationOutputParser类。

这个类需要一个名为parse_result的方法。这个方法接受原始模型输出(例如,GenerationChatGeneration的列表)并返回解析后的输出。

支持GenerationChatGeneration两种类型,使解析器可以同时与普通语言模型和聊天模型一起使用。

from typing import List
 
from langchain_core.exceptions import OutputParserException
from langchain_core.messages import AIMessage
from langchain_core.output_parsers import BaseGenerationOutputParser
from langchain_core.outputs import ChatGeneration, Generation
 
 
class StrInvertCase(BaseGenerationOutputParser[str]):
    """一个示例解析器,翻转消息中字符的大小写。
 
    这是一个演示示例,目的是展示简单。
    """
 
    def parse_result(self, result: List[Generation], *, partial: bool = False) -> str:
        """将模型生成的结果解析成特定格式的方法。
 
        参数:
            result: 一个待解析的Generation列表。假定这些Generation是用于单个模型输入的不同候选输出。
                    许多解析器假定只传入一个单一生成。
                    我们会对此进行断言
            partial: 是否允许部分结果。这用于支持流式处理的解析器。
        """
        if len(result) != 1:
            raise NotImplementedError(
                "此输出解析器只能用于单个生成。"
            )
        generation = result[0]
        if not isinstance(generation, ChatGeneration):
            # 表示此解析器只能用于聊天生成
            raise OutputParserException(
                "此输出解析器只能用于聊天生成。"
            )
        return generation.message.content.swapcase()
 
 
chain = anthropic | StrInvertCase()

让我们尝试一下新的解析器!它应该会翻转模型的输出。

chain.invoke("Tell me a short sentence about yourself")

'HELLO! MY NAME IS CLAUDE.'