フレクトのクラウドblog re:newal

http://blog.flect.co.jp/cloud/からさらに引っ越しています

Einstein Copilot と Data Cloud Vector Database を連携する

こんにちは。エンジニアの山下です。

今回は Einstein Copilot と Data Cloud Vector Database を連携する方法について書こうと思います。

Einstein Copilot は Salesforce が提供する LLM によるアシスタントサービスで、チャットベースで LLM に指示を行うことで、自然言語によるレコード検索やメールの文面の下書きなどを行なうことができます。地味に手間のかかる作業を LLM に一任できるため、Salesforce を普段使いする人に重宝されそうなサービスです。

Einstein Copilot の微妙な点は、デフォルトで搭載されている検索機能が CRM レコードのみを対象としており、Data Cloud のデータを検索するためには利用することができないことです。

Data Cloud に取り込んだ外部データを Einstein Copilot で扱えるようになれば、任意のデータを活用した、より柔軟な CRM 活動が行えるようになりそうな気がします。実際、そのシナジーについては Salesforce公式の記事 でも「最も強力な例の1つ」として紹介されているほどで、外部に活用可能なデータがある場合、連携しない理由はほとんどないと言ってもよいのではないでしょうか。

というわけで、何とか連携したいよねという話になるのですが、残念なことに、これらのサービスの連携方法についての公式情報はほとんどなく、あったとしても概ね「頑張って APEX で連携してください」という内容に留まっています。

しかし、せっかく可能性がありそうなものを放っておくのはもったいないので、本記事で具体的な連携手順を紹介し、このギャップを埋めたいと思います。

やること

今回は Einstein Copilot に Custom Action を追加して Data Cloud Vector Database を利用したベクトル検索を行います。

Einstein Copilot から Data Cloud Vector Database までの流れは以下の通りです。

Einstein Copilot
→ Custom Action
→ Flow
→ APEX Action
→ Data Cloud Vector Database

Einstein Copilot の Custom Action には Flow が利用できるので、Flow 経由で APEX を実行して Data Cloud Vector Database へのクエリを発行します。

Data Cloud Vector Database のクエリ発行はやや煩雑で、Connected App で認証を行った上で Query API を利用する必要があります。つまり、インターネット経由で認証を行って API を呼び出すという形になります。Salesforce のサービス同士を連携させたいだけなのにインターネットを経由するのかという複雑な気持ちになりますが、残念ながら現状では他の連携手段はなさそうです。公式の記事で謳っている連携なのでは?

その他、Data Cloud の Query API を利用するには専用のアクセストークンが必要で、Connected App から取得したアクセストークンを直接使うことができないなど、Data Cloud には微妙に融通が効かない点がいくつかあります。これらの注意事項については必要になったタイミングで適宜述べたいと思います。

というわけで、早速実装していきます。

Data Cloud Vector Database の準備

まずは Data Cloud Vector Database を利用するために必要な Search Index を作成します。

とはいえ、特に難しい点はなく、作成画面から Easy Setup を選択後、検索対象となる DMO を選んで Save を押下するだけで完了です。

Easy Setup を選択した場合、使用する Embedding Model や検索対象のフィールドを自動で判別して設定してくれます。

自動で設定される内容が気に食わない場合は Advanced Setup から自分で設定することも可能です。選択可能な Embedding Model は以下になります。

また Advanced Setup を選択した場合でもチャンクサイズは指定できず、自動で決定されてしまうようです。このあたりのチューニング項目の強化については今後に期待したいですね。

Search Index を作成すると、以下の API 名を持つ DMO が定義されます。

  • (your-dmo-name)_index__dlm
  • (your-dmo-name)_chunk__dlm

前者が Embedding を保持する検索用インデックスで、後者がインデックスに対応する文章を保持するチャンクです。これらの DMO を Query APISQL で指定することでベクトル検索が実行可能になります。

Connected App の準備

冒頭でも述べましたが、Data Cloud Vector Database へはインターネット経由でアクセスする必要があるため、Connected App の準備が必要です。

Enable OAuth Settings と Enable Client Credentials Flow にチェックを入れ、OAuth Scope に以下を設定すれば OK です。

  • Manage user data via APIs (api)
  • Perform ANSI SQL queries on Data Cloud data (cdp_query_api)

APEX Action の定義

APEX から Data Cloud Vector Database をクエリする処理を書いていきます。

やりたいことは Data Cloud の Query API の呼び出しですが、以下に示すように、これには都合3回の API 呼び出しが必要になります。

  1. Connected App からアクセストークンを取得する
  2. 1 のアクセストークンを Data Cloud 用のアクセストークンと交換する
  3. 2 のアクセストークンを使って Data Cloud の Query API を呼び出す

注意が必要なのは 2 の工程で、実は Connected App から取得したアクセストークンをそのまま Query API のために使用することはできません。Query API を呼ぶためには、Connected App から取得したアクセストークンを Data Cloud 用のアクセストークンに変換する必要があります。この変換処理は Salesforce が提供するトークン交換のための API エンドポイントを通じて実行できます。

以下の各節でそれぞれの処理を実装していきます。

Connected App からアクセストークンを取得する

これは割とありふれた処理のため、特に難しいポイントはありません。

    private static TokenResponese retrieveSalesforceToken(String endpoint, String clientId, String clientSecret) {
        Map<String, String> params = new Map<String, String>();
        params.put('grant_type', 'client_credentials');
        params.put('client_id', clientId);
        params.put('client_secret', clientSecret);
        HttpResponse res = post(endpoint, params);

        JSONParser parser = JSON.createParser(res.getBody());
        return (TokenResponese) parser.readValueAs(TokenResponese.class);
    }

    private static HttpResponse post(String url, Map<String, String> params) {
        HttpRequest req = new HttpRequest();
        req.setEndpoint(url);
        req.setMethod('POST');
        req.setHeader('Content-Type', 'application/x-www-form-urlencoded');
        req.setBody(toParamStr(params));
        return new Http().send(req);
    }

    private static String toParamStr(Map<String, String> params) {
        List<String> out = new List<String>();
        for (String key : params.keySet()) {
            out.add(key + '=' + params.get(key));
        }
        return String.join(out, '&');
    }

    private class TokenResponese {
        String access_token;
        String instance_url;
    }

やっていることはシンプルなのですが、APEX だと無駄にコードが膨らんでつらいですね ... 。

アクセストークンを Data Cloud 用のアクセストークンと交換する

Salesforceトークン交換エンドポイントは OAuth 2.0 の Custom Grant Type で実装されているようなので、基本的には先ほど retrieveSalesforceToken でやった内容を別のエンドポイントに対してもう一度行う形になります。

Custom Grant Type の内容さえ把握してしまえば、こちらも特に難しいポイントはありません。

    private static TokenResponese exchageSalesforceTokenForDataCloudToken(String endpoint, String sfToken) {
        Map<String, String> params = new Map<String, String>();
        params.put('grant_type', 'urn:salesforce:grant-type:external:cdp');
        params.put('subject_token', sfToken);
        params.put('subject_token_type', 'urn:ietf:params:oauth:token-type:access_token');
        HttpResponse res = post(endpoint, params);

        JSONParser parser = JSON.createParser(res.getBody());
        return (TokenResponese) parser.readValueAs(TokenResponese.class);
    }

どちらも OAuth の Token エンドポイント準拠なので、ほとんどのコードが流用可能で幸せですね。できればトークン交換自体不要にしてくれないか。

Data Cloud の Query API を呼び出す

まずは Query API を呼ぶための雛形から実装します。

    private static HttpResponse query(String endpoint, String token, String sql) {
        Map<String, String> headers = new Map<String, String>();
        headers.put('Authorization', 'Bearer ' + token);

        String json = '{"sql": "' + sql + '"}';

        return postJson(endpoint, json, headers);
    }

    private static HttpResponse postJson(String url, String json, Map<String, String> headers) {
        HttpRequest req = new HttpRequest();
        req.setEndpoint(url);
        req.setMethod('POST');
        req.setHeader('Content-Type', 'application/json');
        req.setBody(json);

        for (String key : headers.keySet()) {
            req.setHeader(key, headers.get(key));
        }

        return new Http().send(req);
    }

Authorization ヘッダーを指定して JSON を POST しているだけなので、この時点では特に難しい点はありません。

Query API の戸惑うポイントは引数に指定する SQL の書き方で、ベクトル検索を行う場合は vector_search という専用の関数を利用する必要があります。これは以下のような記法で利用します。

SELECT * FROM
  vector_search(
    TABLE(<your-index-dmo>),
    '<search-keyword>',
    '<pre-filetering-condition>',
    <top-k>
  )

top-k は検索結果の上位何件を取得するかを指定するための引数で、整数値を指定します。pre-filetering-condition はベクトル検索の前にレコードを絞り込むための条件を記述するための引数です。今回は pre-filtering-condition は使用しません。

上記のプレースホルダを実際の引数で埋めたサンプルは以下になります。

SELECT * FROM
  vector_search(TABLE(your_index__dlm), 'test', '', 1)

この場合は your_index__dlm という Search Index の DMO を使って test というキーワードでベクトル検索を行い、スコア上位1件を取得するという内容になります。なお、絞り込み条件に空文字列を指定しているため、レコードの絞り込みは行われません。

これだけでもベクトル検索自体は可能なのですが、インデックスには Embedding の情報しかないため、結果がやや殺風景になってしまいます。そのため、以下のように原文の情報を持つチャンクを JOIN して取得してあげるのがベターです。

SELECT
  index.score__c
, chunk.Chunk__c
FROM
  vector_search(TABLE(your_index__dlm), 'test', '', 1) index
JOIN
  your_chunk__dlm chunk
    ON index.RecordId__c = chunk.RecordId__c

というわけで、引数の SQL も明確になったところで最後の仕上げです。これまで定義してきた関数群を繋ぎ合わせます。

public class EdgeTechTest {

    private static String SF_TOKEN_ENDPOINT = 'https://(your-org).my.salesforce.com/services/oauth2/token';
    private static String SF_TOKEN_EXCHANGE_ENDPOINT = 'https://(your-org).my.salesforce.com/services/a360/token';
    private static String SF_CLIENT_ID = '(your-client-id)';
    private static String SF_CLIENT_SECRET = '(your-client-secret)';

    @InvocableMethod
    public static List<String> getAccountNames(List<String> keywords) {
        TokenResponese sfToken = retrieveSalesforceToken(SF_TOKEN_ENDPOINT, SF_CLIENT_ID, SF_CLIENT_SECRET);
        TokenResponese dcToken = exchageSalesforceTokenForDataCloudToken(SF_TOKEN_EXCHANGE_ENDPOINT, sfToken.access_token);

        String keyword = keywords[0];
        String queryEndpoint = toQueryEndpoint(dcToken.instance_url);
        String sql = '';
        sql += ' SELECT';
        sql += '   index.score__c';
        sql += ' , chunk.Chunk__c';
        sql += ' FROM';
        sql += '   vector_search(TABLE(your_index__dlm), \'' + keyword + '\', \'\', 1) index';
        sql += ' JOIN';
        sql += '   your_chunk__dlm chunk';
        sql += '     ON index.RecordId__c = chunk.RecordId__c';
        HttpResponse res = query(queryEndpoint, dcToken.access_token, sql);

        List<String> result = new List<String>();
        result.add(res.getBody());

        return result;
    }

    private static String toQueryEndpoint(String instanceUrl) {
        return 'https://' + instanceUrl + '/api/v2/query';
    }

    // ... 今まで定義した関数群が以下に続く ...
}

なお、シークレット等を定数で雑に定義していますが、本来は適切に管理すべきです。今回は割愛します。

Flow を定義する

Custom Action で使用する Flow を定義していきます。定義方法は通常の Flow と同じです。

Einstein Copilot が Flow を利用する際の入出力の内容は Flow のリソースで定義します。リソース作成画面にある Available for Input / Output のチェックボックスをチェックすることで、自動的に Flow の入出力の対象としてリソースが認識されるようになります。

今回は Input として検索キーワードを受け取り、ベクトル検索を行って得た結果を Output として返す形で Flow を実装します。特に難しい点はないので、詳細は割愛します。

Custom Action を定義する

Setup にある Einstein Copilot Actions から Custom Action を定義できます。

New Copilot Action を押下し、画面で先ほど作成した Flow を指定します。続く画面で、先ほど Flow で指定したリソースが以下のように Input / Output に項目として表示されます。

この画面では Custom Action 全体および Input / Output の各項の内容についての説明を自然言語で与える必要があります。Einstein Copilot はここで与えた説明文を見て、Custom Action の呼び出し要否、および Input / Output の内容を自動的に判断します。

他、入力パラメータが必須かどうかなどをチェックボックスで指定します。なお、今回筆者が設定した内容は上の画像にある通りです。

Einstein Copilot に Custom Action を設定する

Copilot Builder で作成した Custom Action を設定します。Copilot Builder は Setup の Einstein Copilots の項から開けます。

Custom Action の設定を行うには一度 Einstein Copilot を Deactivate しなければならないので、忘れずに Deactivate しておきます。その後は Copilot Action Library から先ほど作成した Custom Action をアサインするだけです。

アサイン後、Deactivate した Einstein Copilot は忘れずに Activate しておきます。

これで全ての手順が完了です。

テスト

というわけでテストしてみます。

検索対象となるデータは何でもよいのですが、今回は LangChain のサンプルを拝借しました。このサンプルでは以下の5つの文章を対象にベクトル検索を行います。

  • Dogs are great companions, known for their loyalty and friendliness.
  • Cats are independent pets that often enjoy their own space.
  • Goldfish are popular pets for beginners, requiring relatively simple care.
  • Parrots are intelligent birds capable of mimicking human speech.
  • Rabbits are social animals that need plenty of space to hop around.

LangChain のドキュメントには Cat で検索した場合は上から2番目の文章が、Shark で検索した場合は上から3番目の文章が結果として得られたと書いてあります。この内容を Einstein Copilot で試してみましょう。

Copilot Builder の Conversation Preview で試してみます。まずは Cat から。

JSON をそのまま表示しているので少し見づらいですが、画像右ペインで上から2番目の文章が返ってきていることがわかります。画像左ペインでは Einstein Copilot が入力として Cat を指定していることがわかります。いい感じですね。

続いて Shark を指定した場合も見てみます。

上から3番目の文章が返ってきていますね。本文中に直接含まれていない単語のためか、スコアは先ほどの結果よりもやや低めになっています。

今回はレスポンスの JSON をそのまま表示していますが、これを Prompt Template に埋め込んで LLM にいい感じのレスポンスを生成してもらったり等、カスタマイズの幅は結構ありそうです。

まとめ

というわけで、Einstein Copilot と Data Cloud を連携させてベクトル検索を行うことができました。

冒頭でも述べた通り、Einstein Copilot は Salesforce を普段使いする人には日常の効率に関わる機能なので、データの参照先が CRM 内部に限るというのは非常に惜しいと感じていました。今回示した連携によってその制約を排除できるのは結構大きいのではないかと思っています。

Data Cloud Vector Database と直接の連携ができず、一度インターネットを経由する必要があるのは微妙なところですが、このあたりは今後の機能改善に期待したいですね。

以上です。最後までお読みいただき、ありがとうございました。