百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

聊聊langchain4j的HTTP Client(lang=postcss)

suiw9 2025-03-29 21:56 7 浏览 0 评论

本文主要研究一下langchain4j的HTTP Client

langchain4j-http-client

langchain4j提供了langchain4j-http-client模块,它实现了一个HttpClient SPI(服务提供者接口),其他模块通过该接口调用LLM提供商的REST API。这意味着底层HTTP客户端可以被自定义,通过实现HttpClient SPI,还可以集成任何其他HTTP客户端。目前,有两个现成的实现:

  • langchain4j-http-client-jdk模块中的JdkHttpClient。在例如langchain4j-open-ai等有引用的模块中默认使用
  • langchain4j-http-client-spring-restclient模块中的SpringRestClient。在例如langchain4j-open-ai-spring-boot-starter等有引用的spring boot starter中默认使用

HttpClient

langchain4j-http-client/src/main/java/dev/langchain4j/http/client/HttpClient.java

@Experimental
public interface HttpClient {

    /**
     * Executes a given HTTP request synchronously and returns the response.
     * This method blocks until the entire response is received.
     *
     * @param request the HTTP request to be executed.
     * @return a {@link SuccessfulHttpResponse} containing the response data for successful HTTP requests (2XX status codes)
     * @throws HttpException    if the server returns a client (4XX) or server (5XX) error response
     * @throws RuntimeException if an unexpected error occurs during request execution (e.g., network issues, timeouts)
     */
    SuccessfulHttpResponse execute(HttpRequest request) throws HttpException, RuntimeException;

    /**
     * Executes a given HTTP request asynchronously with server-sent events (SSE) handling.
     * This method returns immediately while processing continues on a separate thread.
     * Events are processed through the provided {@link ServerSentEventListener}.
     * 

* The execution flow is as follows: *

    *
  1. The request is initiated asynchronously
  2. *
  3. Received SSE data is parsed using the {@link DefaultServerSentEventParser}
  4. *
  5. Parsed events are delivered to the listener's appropriate methods
  6. *
  7. If an error occurs, {@link ServerSentEventListener#onError(Throwable)} is called
  8. *
*

* If any exception is thrown from the listener's methods, the stream processing * will be terminated and no further events will be processed. * * @param request the HTTP request to be executed. * @param listener the listener to receive parsed events and error notifications. */ default void execute(HttpRequest request, ServerSentEventListener listener) { execute(request, new DefaultServerSentEventParser(), listener); } /** * Executes a given HTTP request asynchronously with server-sent events (SSE) handling. * This method returns immediately while processing continues on a separate thread. * Events are processed through the provided {@link ServerSentEventListener}. *

* The execution flow is as follows: *

    *
  1. The request is initiated asynchronously
  2. *
  3. Received SSE data is parsed using the provided parser
  4. *
  5. Parsed events are delivered to the listener's appropriate methods
  6. *
  7. If an error occurs, {@link ServerSentEventListener#onError(Throwable)} is called
  8. *
*

* If any exception is thrown from the listener's methods, the stream processing * will be terminated and no further events will be processed. * * @param request the HTTP request to be executed. * @param parser the parser to process incoming server-sent events. * @param listener the listener to receive parsed events and error notifications. */ void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener); }

HttpClient定义了execute方法,其中有一个支持ServerSentEventListener

HttpClientBuilderFactory

langchain4j-http-client/src/main/java/dev/langchain4j/http/client/HttpClientBuilderFactory.java

@Experimental
public interface HttpClientBuilderFactory {

    HttpClientBuilder create();
}

HttpClientBuilderFactory定义了create方法返回HttpClientBuilder

HttpClientBuilder

langchain4j-http-client/src/main/java/dev/langchain4j/http/client/HttpClientBuilder.java

@Experimental
public interface HttpClientBuilder {

    Duration connectTimeout();

    HttpClientBuilder connectTimeout(Duration timeout);

    Duration readTimeout();

    HttpClientBuilder readTimeout(Duration timeout);

    HttpClient build();
}

HttpClientBuilder定义了connectTimeout、readTimeout、build方法

HttpClientBuilderLoader

langchain4j-http-client/src/main/java/dev/langchain4j/http/client/HttpClientBuilderLoader.java

@Experimental
public class HttpClientBuilderLoader {

    public static HttpClientBuilder loadHttpClientBuilder() {
        Collection factories = loadFactories(HttpClientBuilderFactory.class);

        if (factories.size() > 1) {
            List factoryNames = factories.stream()
                    .map(factory -> factory.getClass().getName())
                    .toList();
            throw new IllegalStateException(String.format("Conflict: multiple HTTP clients have been found " +
                    "in the classpath: %s. Please explicitly specify the one you wish to use.", factoryNames));
        }

        for (HttpClientBuilderFactory factory : factories) {
            return factory.create();
        }

        throw new IllegalStateException("No HTTP client has been found in the classpath");
    }
}

HttpClientBuilderLoader提供了loadHttpClientBuilder方法,它通过
dev.langchain4j.spi.ServiceHelper.loadFactories方法来加载HttpClientBuilderFactory的实现类

ServiceHelper

langchain4j-core/src/main/java/dev/langchain4j/spi/ServiceHelper.java

    /**
     * Load all the services of a given type.
     *
     * @param clazz the type of service
     * @param    the type of service
     * @return the list of services, empty if none
     */
    public static  Collection loadFactories(Class clazz) {
        return loadFactories(clazz, null);
    }

    /**
     * Load all the services of a given type.
     *
     * 

Utility mechanism around {@code ServiceLoader.load()}

* *
    *
  • If classloader is {@code null}, will try {@code ServiceLoader.load(clazz)}
  • *
  • If classloader is not {@code null}, will try {@code ServiceLoader.load(clazz, classloader)}
  • *
* *

If the above return nothing, will fall back to {@code ServiceLoader.load(clazz, $this class loader$)}

* * @param clazz the type of service * @param classLoader the classloader to use, may be null * @param the type of service * @return the list of services, empty if none */ public static Collection loadFactories(Class clazz, /* @Nullable */ ClassLoader classLoader) { List result; if (classLoader != null) { result = loadAll(ServiceLoader.load(clazz, classLoader)); } else { // this is equivalent to: // ServiceLoader.load(clazz, TCCL); result = loadAll(ServiceLoader.load(clazz)); } if (result.isEmpty()) { // By default, ServiceLoader.load uses the TCCL, this may not be enough in environment dealing with // classloaders differently such as OSGi. So we should try to use the classloader having loaded this // class. In OSGi it would be the bundle exposing vert.x and so have access to all its classes. result = loadAll(ServiceLoader.load(clazz, ServiceHelper.class.getClassLoader())); } return result; }

ServiceHelper是对JDK自带的ServiceLoader.load()进行了封装

langchain4j-http-client-jdk

JdkHttpClientBuilderFactory

dev/langchain4j/http/client/jdk/JdkHttpClientBuilderFactory.java

public class JdkHttpClientBuilderFactory implements HttpClientBuilderFactory {

    @Override
    public JdkHttpClientBuilder create() {
        return JdkHttpClient.builder();
    }
}


JdkHttpClientBuilderFactory的create返回的是JdkHttpClientBuilder

JdkHttpClientBuilder

dev/langchain4j/http/client/jdk/JdkHttpClientBuilder.java

public class JdkHttpClientBuilder implements HttpClientBuilder {

    private java.net.http.HttpClient.Builder httpClientBuilder;
    private Duration connectTimeout;
    private Duration readTimeout;

    public java.net.http.HttpClient.Builder httpClientBuilder() {
        return httpClientBuilder;
    }

    public JdkHttpClientBuilder httpClientBuilder(java.net.http.HttpClient.Builder httpClientBuilder) {
        this.httpClientBuilder = httpClientBuilder;
        return this;
    }

    @Override
    public Duration connectTimeout() {
        return connectTimeout;
    }

    @Override
    public JdkHttpClientBuilder connectTimeout(Duration connectTimeout) {
        this.connectTimeout = connectTimeout;
        return this;
    }

    @Override
    public Duration readTimeout() {
        return readTimeout;
    }

    @Override
    public JdkHttpClientBuilder readTimeout(Duration readTimeout) {
        this.readTimeout = readTimeout;
        return this;
    }

    @Override
    public JdkHttpClient build() {
        return new JdkHttpClient(this);
    }
}

JdkHttpClientBuilder的build构建的是JdkHttpClient

JdkHttpClient

dev/langchain4j/http/client/jdk/JdkHttpClient.java

public class JdkHttpClient implements HttpClient {

    private final java.net.http.HttpClient delegate;
    private final Duration readTimeout;

    public JdkHttpClient(JdkHttpClientBuilder builder) {
        java.net.http.HttpClient.Builder httpClientBuilder =
                getOrDefault(builder.httpClientBuilder(), java.net.http.HttpClient::newBuilder);
        if (builder.connectTimeout() != null) {
            httpClientBuilder.connectTimeout(builder.connectTimeout());
        }
        this.delegate = httpClientBuilder.build();
        this.readTimeout = builder.readTimeout();
    }

    public static JdkHttpClientBuilder builder() {
        return new JdkHttpClientBuilder();
    }

    @Override
    public SuccessfulHttpResponse execute(HttpRequest request) throws HttpException {
        try {
            java.net.http.HttpRequest jdkRequest = toJdkRequest(request);

            java.net.http.HttpResponse jdkResponse = delegate.send(jdkRequest, BodyHandlers.ofString());

            if (!isSuccessful(jdkResponse)) {
                throw new HttpException(jdkResponse.statusCode(), jdkResponse.body());
            }

            return fromJdkResponse(jdkResponse, jdkResponse.body());
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
        java.net.http.HttpRequest jdkRequest = toJdkRequest(request);

        delegate.sendAsync(jdkRequest, BodyHandlers.ofInputStream())
                .thenAccept(jdkResponse -> {

                    if (!isSuccessful(jdkResponse)) {
                        listener.onError(new HttpException(jdkResponse.statusCode(), readBody(jdkResponse)));
                        return;
                    }

                    SuccessfulHttpResponse response = fromJdkResponse(jdkResponse, null);
                    listener.onOpen(response);

                    try (InputStream inputStream = jdkResponse.body()) {
                        parser.parse(inputStream, listener);
                        listener.onClose();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .exceptionally(throwable -> {
                    if (throwable.getCause() instanceof HttpTimeoutException) {
                        listener.onError(throwable);
                    }
                    return null;
                });
    }

    //......
}    

JdkHttpClient的构造器创建了java.net.http.HttpClient作为delegate

langchain4j-http-client-spring-restclient

SpringRestClientBuilderFactory

dev/langchain4j/http/client/spring/restclient/SpringRestClientBuilderFactory.java

public class SpringRestClientBuilderFactory implements HttpClientBuilderFactory {

    @Override
    public SpringRestClientBuilder create() {
        return SpringRestClient.builder();
    }
}


SpringRestClientBuilderFactory的create返回的是SpringRestClientBuilder

SpringRestClientBuilder

dev/langchain4j/http/client/spring/restclient/SpringRestClientBuilder.java

public class SpringRestClientBuilder implements HttpClientBuilder {

    private RestClient.Builder restClientBuilder;
    private AsyncTaskExecutor streamingRequestExecutor;
    private Boolean createDefaultStreamingRequestExecutor = true;
    private Duration connectTimeout;
    private Duration readTimeout;

    public RestClient.Builder restClientBuilder() {
        return restClientBuilder;
    }

    public SpringRestClientBuilder restClientBuilder(RestClient.Builder restClientBuilder) {
        this.restClientBuilder = restClientBuilder;
        return this;
    }

    public AsyncTaskExecutor streamingRequestExecutor() {
        return streamingRequestExecutor;
    }

    public SpringRestClientBuilder streamingRequestExecutor(AsyncTaskExecutor streamingRequestExecutor) {
        this.streamingRequestExecutor = streamingRequestExecutor;
        return this;
    }

    public Boolean createDefaultStreamingRequestExecutor() {
        return createDefaultStreamingRequestExecutor;
    }

    public SpringRestClientBuilder createDefaultStreamingRequestExecutor(Boolean createDefaultStreamingRequestExecutor) {
        this.createDefaultStreamingRequestExecutor = createDefaultStreamingRequestExecutor;
        return this;
    }

    @Override
    public Duration connectTimeout() {
        return connectTimeout;
    }

    @Override
    public SpringRestClientBuilder connectTimeout(Duration connectTimeout) {
        this.connectTimeout = connectTimeout;
        return this;
    }

    @Override
    public Duration readTimeout() {
        return readTimeout;
    }

    @Override
    public SpringRestClientBuilder readTimeout(Duration readTimeout) {
        this.readTimeout = readTimeout;
        return this;
    }

    @Override
    public SpringRestClient build() {
        return new SpringRestClient(this);
    }
}

SpringRestClientBuilder的build创建的是SpringRestClient

SpringRestClient

dev/langchain4j/http/client/spring/restclient/SpringRestClient.java

public class SpringRestClient implements HttpClient {

    private final RestClient delegate;
    private final AsyncTaskExecutor streamingRequestExecutor;

    public SpringRestClient(SpringRestClientBuilder builder) {

        RestClient.Builder restClientBuilder = getOrDefault(builder.restClientBuilder(), RestClient::builder);

        ClientHttpRequestFactorySettings settings = ClientHttpRequestFactorySettings.DEFAULTS;
        if (builder.connectTimeout() != null) {
            settings = settings.withConnectTimeout(builder.connectTimeout());
        }
        if (builder.readTimeout() != null) {
            settings = settings.withReadTimeout(builder.readTimeout());
        }
        ClientHttpRequestFactory clientHttpRequestFactory = ClientHttpRequestFactories.get(settings);

        this.delegate = restClientBuilder
                .requestFactory(clientHttpRequestFactory)
                .build();

        this.streamingRequestExecutor = getOrDefault(builder.streamingRequestExecutor(), () -> {
            if (builder.createDefaultStreamingRequestExecutor()) {
                return createDefaultStreamingRequestExecutor();
            } else {
                return null;
            }
        });
    }

    //......

    @Override
    public SuccessfulHttpResponse execute(HttpRequest request) throws HttpException {
        try {
            ResponseEntity responseEntity = toSpringRestClientRequest(request)
                    .retrieve()
                    .toEntity(String.class);

            return SuccessfulHttpResponse.builder()
                    .statusCode(responseEntity.getStatusCode().value())
                    .headers(responseEntity.getHeaders())
                    .body(responseEntity.getBody())
                    .build();
        } catch (RestClientResponseException e) {
            throw new HttpException(e.getStatusCode().value(), e.getMessage());
        }
    }

    @Override
    public void execute(HttpRequest request, ServerSentEventParser parser, ServerSentEventListener listener) {
        streamingRequestExecutor.execute(() -> {
            try {
                toSpringRestClientRequest(request)
                        .exchange((springRequest, springResponse) -> {

                            int statusCode = springResponse.getStatusCode().value();

                            if (!springResponse.getStatusCode().is2xxSuccessful()) {
                                String body = springResponse.bodyTo(String.class);
                                listener.onError(new HttpException(statusCode, body));
                                return null;
                            }

                            SuccessfulHttpResponse response = SuccessfulHttpResponse.builder()
                                    .statusCode(statusCode)
                                    .headers(springResponse.getHeaders())
                                    .build();
                            listener.onOpen(response);

                            try (InputStream inputStream = springResponse.getBody()) {
                                parser.parse(inputStream, listener);
                                listener.onClose();
                            }

                            return null;
                        });
            } catch (Exception e) {
                if (e.getCause() instanceof SocketTimeoutException) {
                    listener.onError(e);
                }
            }
        });
    }

}    

SpringRestClient的构造器构建了
org.springframework.web.client.RestClient为delegate,同时要构建了streamingRequestExecutor用于ServerSentEventListener

小结

langchain4j的langchain4j-http-client模块定义了HttpClient SPI(服务提供者接口),目前有两个现成的实现:

  • langchain4j-http-client-jdk模块中的JdkHttpClient,使用的是java.net.http.HttpClient
  • langchain4j-http-client-spring-restclient模块中的SpringRestClient,使用的是org.springframework.web.client.RestClient

doc

  • customizable-http-client

相关推荐

看完这一篇数据仓库干货,终于搞懂什么是hive了

一、Hive定义Hive最早来源于FaceBook,因为FaceBook网站每天产生海量的结构化日志数据,为了对这些数据进行管理,并且因为机器学习的需求,产生了Hive这们技术,并继续发展成为一个成...

真正让你明白Hive参数调优系列1:控制map个数与性能调优参数

本系列几章系统地介绍了开发中Hive常见的用户配置属性(有时称为参数,变量或选项),并说明了哪些版本引入了哪些属性,常见有哪些属性的使用,哪些属性可以进行Hive调优,以及如何使用的问题。以及日常Hi...

HIVE SQL基础语法(hive sql是什么)

引言与关系型数据库的SQL略有不同,但支持了绝大多数的语句如DDL、DML以及常见的聚合函数、连接查询、条件查询。HIVE不适合用于联机事务处理,也不提供实时查询功能。它最适合应用在基于大量不可变数据...

[干货]Hive与Spark sql整合并测试效率

在目前的大数据架构中hive是用来做离线数据分析的,而在Spark1.4版本中spark加入了sparksql,我们知道spark的优势是速度快,那么到底sparksql会比hive...

Hive 常用的函数(hive 数学函数)

一、Hive函数概述及分类标准概述Hive内建了不少函数,用于满足用户不同使用需求,提高SQL编写效率:...

数仓/数开面试题真题总结(二)(数仓面试时应该讲些什么)

二.Hive...

Tomcat处理HTTP请求流程解析(tomcat 处理请求过程)

1、一个简单的HTTP服务器在Web应用中,浏览器请求一个URL,服务器就把生成的HTML网页发送给浏览器,而浏览器和服务器之间的传输协议是HTTP,那么接下来我们看下如何用Java来实现一个简单...

Python 高级编程之网络编程 Socket(六)

一、概述Python网络编程是指使用Python语言编写的网络应用程序。这种编程涉及到网络通信、套接字编程、协议解析等多种方面的知识。...

[904]ScalersTalk成长会Python小组第20周学习笔记

Scalers点评:在2015年,ScalersTalk成长会Python小组完成了《Python核心编程》第1轮的学习。到2016年,我们开始第二轮的学习,并且将重点放在章节的习题上。Python小...

「web开发」几款http请求测试工具

curl命令CURL(CommandLineUniformResourceLocator),是一个利用URL语法,在命令行终端下使用的网络请求工具,支持HTTP、HTTPS、FTP等协议...

x-cmd pkg | hurl - 强力的 HTTP 请求测试工具,让 API 测试更加简洁高效

简介...

Mac 基于HTTP方式访问下载共享文件,配置共享服务器

方法一:使用Python的SimpleHTTPServer进行局域网文件共享Mac自带Python,所以不需要安装其他软件,一条命令即可...

Python 基础教程十五之 Python 使用requests库发送http请求

前言...

使用curl进行http高并发访问(php curl 大量并发获得结果)

本文主要介绍curl异步接口的使用方式,以及获取高性能的一些思路和实践。同时假设读者已经熟悉并且使用过同步接口。1.curl接口基本介绍curl一共有三种接口:EasyInterface...

Django 中的 HttpResponse理解和用法-基础篇1

思路是方向,代码是时间,知识需积累,经验需摸索。希望对大家有用,有错误还望指出。...

取消回复欢迎 发表评论: