banner
子文

子文

世界,你好鸭~
x
github

Airbyteの実践

Airbyte は 2020 年 7 月に発表された最新のオープンソース ETL ツールの一つです。他の ETL ツールとは異なり、Airbyte は UI と API を通じて即使用可能なコネクタを提供し、コミュニティの開発者がこのツールを監視および維持できるようにしています。コネクタは Docker コンテナとして実行され、選択した言語で構築できます。モジュール式コンポーネントとオプション機能のサブセットを提供することで、Airbyte はより大きな柔軟性を提供します。

クイックスタート#

ローカルに Airbyte をデプロイする#

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

起動後にアクセス: http://localhost:8000

データソースを追加する#

image-20220619122412393

ターゲットソースを追加する#

まず、docker-compose を使用してローカルで Postgres データベースを起動します。

  1. mkdir ./data

  2. docker-compose.yaml を作成します:

    version: "3"
    services:
      postgres:
        image: postgres:14
        container_name: postgres
        restart: always
        environment:
          POSTGRES_DB: postgres
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: 123456
        ports:
            - 5432:5432
        volumes:
          - ./data:/var/lib/postgresql/data
    
  3. docker-compose up -dを実行して起動します。

次に、Airbyte でターゲットソースを設定します。

image-20220619134741698

接続を確立する#

image-20220619134925755

image-20220619134942144

image-20220619135033394

同期をトリガーする#

image-20220619135226032

結果を確認する#

image-20220619135522388

Airbyte CDK を使用してカスタム API データソースを開発する#

データソーステンプレートを作成する#

cd airbyte-integrations/connector-templates/generator # Airbyteプロジェクトのルートから開始することを前提としています。
# NPMがない場合は、https://www.npmjs.com/get-npmからインストールしてください
./generate.sh

Python HTTP API Sourceを選択し、データソース名を入力します。ここではsolaを使用します。

image-20220619140718389

Python 依存関係をインストールする#

cd ../../connectors/source-sola
python -m venv .venv # .venvディレクトリに仮想環境を作成します
source .venv/bin/activate # venvを有効にします
pip install -r requirements.txt

サンプルを実行する#

python main.py spec

以下の内容が出力されれば、プロジェクトの初期化が成功したことを示します。

{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}

入力パラメータを定義する#

これを実現する最も簡単な方法は、spec.yamlファイルを作成し、source_<name>/spec.yamlConnectorSpecificationスキーマに従ってコネクタの入力を記述します。ソースコードを開発する際の良い出発点です。JsonSchema を使用して、入力が何であるか(例:ユーザー名とパスワード)を定義します。

vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
  $schema: http://json-schema.org/draft-07/schema#
  title: Solar System openData
  type: object
  required:
    - page_size
  additionalProperties: false
  properties:
    page_size:
      type: integer
      title: Page size
      description: ページサイズ
      default: 10
      minimum: 1
      pattern: \d+

接続の健康チェック#

この操作は、ユーザーが提供した入力構成が基盤となるデータソースに接続できるかどうかを検証します。ユーザーが提供した構成には、spec.yamlに記載された値が含まれています。API にリクエストを送信するには、アクセス権を指定する必要があります。この例では、API は資格情報を必要としないため、かなり簡単なチェックです。./source_solar/source.pyを編集します:

class SolarStream(HttpStream, ABC):
    """
    TODO このコメントを削除する

    このクラスはコネクタによって出力されるストリームを表します。
    これは、APIレベルでのすべての共通機能を含むことを目的とした抽象基底クラスです。例えば、APIの基本URL、ページネーション戦略、
    レスポンスの解析などです。

    各ストリームは、このクラス(またはその抽象サブクラス)を拡張して、そのストリームに特有の動作を指定する必要があります。

    通常、REST APIの場合、各ストリームはAPI内のリソースに対応します。例えば、APIが次のエンドポイントを含む場合
        - GET v1/customers
        - GET v1/employees

    その場合、次の3つのクラスを持つ必要があります:
    `class SolarStream(HttpStream, ABC)`は現在のクラスです。
    `class Customers(SolarStream)`は、v1/customersを使用して顧客のデータを取得する動作を含みます。
    `class Employees(SolarStream)`は、v1/employeesを使用して従業員のデータを取得する動作を含みます。

    一部のストリームが増分同期を実装する場合、通常は別のクラスを作成します。
    `class IncrementalSolarStream((SolarStream), ABC)`を作成し、具体的なストリーム実装がそれを拡張します。以下に例を示します。

    設定可能なオプションの完全なリストについては、リファレンスドキュメントを参照してください。
    """

    # TODO: URLベースを埋める。必須。
    url_base = "https://api.le-systeme-solaire.net/rest/"
    
...
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        の例を参照してください。

        :param config:  コネクタのspec.yamlに準拠したユーザー入力構成オブジェクト
        :param logger:  ロガーオブジェクト
        :return Tuple[bool, any]: (True, None)は、入力構成がAPIに正常に接続できる場合、(False, error)はそれ以外の場合。
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text        
        return False, message

入力ファイルを作成し、チェックを実行します。

mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json

以下の内容が表示されれば、接続チェックが成功したことを示します。

{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

データストリームを宣言する#

インターフェースのデータストリームを宣言するには、次の手順を実行するだけです。

  1. ./source-solar/source.pyに HttpStream クラスを追加します。
class SolarStream(HttpStream, ABC):
    """
    TODO このコメントを削除する

    このクラスはコネクタによって出力されるストリームを表します。
    これは、APIレベルでのすべての共通機能を含むことを目的とした抽象基底クラスです。例えば、APIの基本URL、ページネーション戦略、
    レスポンスの解析などです。

    各ストリームは、このクラス(またはその抽象サブクラス)を拡張して、そのストリームに特有の動作を指定する必要があります。

    通常、REST APIの場合、各ストリームはAPI内のリソースに対応します。例えば、APIが次のエンドポイントを含む場合
        - GET v1/customers
        - GET v1/employees

    その場合、次の3つのクラスを持つ必要があります:
    `class SolarStream(HttpStream, ABC)`は現在のクラスです。
    `class Customers(SolarStream)`は、v1/customersを使用して顧客のデータを取得する動作を含みます。
    `class Employees(SolarStream)`は、v1/employeesを使用して従業員のデータを取得する動作を含みます。

    一部のストリームが増分同期を実装する場合、通常は別のクラスを作成します。
    `class IncrementalSolarStream((SolarStream), ABC)`を作成し、具体的なストリーム実装がそれを拡張します。以下に例を示します。

    設定可能なオプションの完全なリストについては、リファレンスドキュメントを参照してください。
    """

    # TODO: URLベースを埋める。必須。
    url_base = "https://api.le-systeme-solaire.net/rest/"
    page = 1

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__(**kwargs)
        self.page_size = config["page_size"]

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        """
        TODO: このメソッドをオーバーライドしてページネーション戦略を定義します。ページネーションを使用しない場合は、アクションは必要ありません - 単にNoneを返します。

        このメソッドは、ページネーションリクエストを行うために必要な情報を含むマッピング(例:dict)を返す必要があります。この辞書は、このクラス内の他のメソッドに渡され、
        ヘッダー、リクエストボディ、クエリパラメータなどを形成するのに役立ちます。

        例えば、APIが結果のどのページを返すかを決定するために「page」パラメータを受け入れ、APIからのレスポンスに「page」番号が含まれている場合、
        このメソッドはおそらく{'page': response.json()['page'] + 1}という辞書を返す必要があります。request_paramsメソッドは次に、入力next_page_tokenを読み取り、
        'page'パラメータをnext_page_token['page']に設定します。

        :param response: APIからの最新のレスポンス
        :return 結果に別のページがある場合、レスポンス内の次のページをクエリするために必要な情報を含むマッピング(例:dict)を返します。
                結果にもうページがない場合は、Noneを返します。
        """
        return None

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: このメソッドをオーバーライドして設定するクエリパラメータを定義します。リクエストパラメータを定義する必要がない場合は、このメソッドを削除します。
        通常、共通のパラメータ(例:ページネーションサイズなど)を含みます。
        """
        return {}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: このメソッドをオーバーライドしてレスポンスの解析方法を定義します。
        :return レスポンス内の各レコードを含むイテラブルを返します
        """
        yield response.json()


class Bodies(SolarStream):

    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return "bodies"

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        response_data = response.json()
        if len(response_data["bodies"]) == self.page_size:
            self.page += 1
            return {"page": f"{self.page}, {self.page_size}"}

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: このメソッドをオーバーライドして設定するクエリパラメータを定義します。リクエストパラメータを定義する必要がない場合は、このメソッドを削除します。
        通常、共通のパラメータ(例:ページネーションサイズなど)を含みます。
        """
        params = {
            "order": "id,desc",
            "page": f"{self.page}, {self.page_size}"
        }
        if next_page_token:
            params.update(next_page_token)
        return params
  1. このクラスをインスタンス化します。
from airbyte_cdk.sources.streams.http.auth import NoAuth

class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        の例を参照してください。

        :param config:  コネクタのspec.yamlに準拠したユーザー入力構成オブジェクト
        :param logger:  ロガーオブジェクト
        :return Tuple[bool, any]: (True, None)は、入力構成がAPIに正常に接続できる場合、(False, error)はそれ以外の場合。
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: 以下のストリームを自分のストリームに置き換えます。

        :param config: ユーザー入力構成のマッピング、コネクタ仕様に定義されています。
        """
        # TODO: 認証が必要ない場合は、認証者を削除します。
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config)]
  1. ./source-solar/schemasbodies.jsonを新規作成します。
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "bodies": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "englishName": {
            "type": "string"
          },
          "isPlanet": {
            "type": "boolean"
          },
          "moons": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "moon": {
                  "type": "string"
                },
                "rel": {
                  "type": "string"
                }
              }
            }
          },
          "semimajorAxis": {
            "type": "number"
          },
          "perihelion": {
            "type": "number"
          },
          "aphelion": {
            "type": "number"
          },
          "eccentricity": {
            "type": "number"
          },
          "inclination": {
            "type": "number"
          },
          "mass": {
            "type": "object",
            "properties": {
              "massValue": {
                "type": "number"
              },
              "massExponent": {
                "type": "integer"
              }
            }
          },
          "vol": {
            "type": "object",
            "properties": {
              "volValue": {
                "type": "number"
              },
              "volExponent": {
                "type": "integer"
              }
            }
          },
          "density": {
            "type": "number"
          },
          "gravity": {
            "type": "number"
          },
          "escape": {
            "type": "number"
          },
          "meanRadius": {
            "type": "number"
          },
          "equaRadius": {
            "type": "number"
          },
          "polarRadius": {
            "type": "number"
          },
          "flattening": {
            "type": "number"
          },
          "dimension": {
            "type": "string"
          },
          "sideralOrbit": {
            "type": "number"
          },
          "sideralRotation": {
            "type": "number"
          },
          "aroundPlanet": {
            "type": "object",
            "properties": {
              "planet": {
                "type": "string"
              },
              "rel": {
                "type": "string"
              }
            }
          },
          "discoveredBy": {
            "type": "string"
          },
          "discoveryDate": {
            "type": "string"
          },
          "alternativeName": {
            "type": "string"
          },
          "axialTilt": {
            "type": "number"
          },
          "avgTemp": {
            "type": "number"
          },
          "mainAnomaly": {
            "type": "number"
          },
          "argPeriapsis": {
            "type": "number"
          },
          "longAscNode": {
            "type": "number"
          },
          "bodyType": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    }
  }
}
  1. チェックを実行します。
python main.py discover --config ./sample_files/secrets/config.json

正しい出力は次のとおりです。

{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}

データを読み取る#

読み取り構成を作成します。

vi ./sample_files/configured_catalog.json
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

データ読み取りを実行します。

python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

出力は次のとおりです。

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "Vénus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadius": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alternativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion": 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch", "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Valétudo", "englishName": "Valetudo", "isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupiter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Obéron", "rel": "https://api.le-systeme-solaire.net/rest/bodies/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cordélia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"moon": "Ophélie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desdémone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": "https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-systeme-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bodies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon": "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457, "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 25362.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "perihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton", "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent": 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}

...

{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

依存関係を処理する#

  1. HttpSubStream クラスを作成します。
class SolarSubStream(HttpSubStream, SolarStream, ABC):
    # 無効なリンクをスキップします
    raise_on_http_errors = False

    def __init__(self, parent: SolarStream, **kwargs):
        super().__init__(parent=parent, **kwargs)

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: このメソッドをオーバーライドしてレスポンスの解析方法を定義します。
        :return レスポンス内の各レコードを含むイテラブルを返します
        """
        yield response.json()
  1. 詳細インターフェースクラスを作成します。
class DetailOfBodies(SolarSubStream):
    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return f'bodies/{stream_slice["id"]}'

    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
            for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
                for data in record["bodies"]:
                    yield {"id": data["id"]}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        if response.status_code == 200:
            yield response.json()
        else:
            self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
  1. 詳細インターフェースをインスタンス化します。
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        の例を参照してください。

        :param config:  コネクタのspec.yamlに準拠したユーザー入力構成オブジェクト
        :param logger:  ロガーオブジェクト
        :return Tuple[bool, any]: (True, None)は、入力構成がAPIに正常に接続できる場合、(False, error)はそれ以外の場合。
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: 以下のストリームを自分のストリームに置き換えます。

        :param config: ユーザー入力構成のマッピング、コネクタ仕様に定義されています。
        """
        # TODO: 認証が必要ない場合は、認証者を削除します。
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config),
                DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
  1. レスポンス構造を宣言します。
vi ./source_solar/schemas/detail_of_bodies.json
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "englishName": {
      "type": "string"
    },
    "isPlanet": {
      "type": "boolean"
    },
    "moons": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "moon": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    },
    "semimajorAxis": {
      "type": "number"
    },
    "perihelion": {
      "type": "number"
    },
    "aphelion": {
      "type": "number"
    },
    "eccentricity": {
      "type": "number"
    },
    "inclination": {
      "type": "number"
    },
    "mass": {
      "type": "object",
      "properties": {
        "massValue": {
          "type": "number"
        },
        "massExponent": {
          "type": "integer"
        }
      }
    },
    "vol": {
      "type": "object",
      "properties": {
        "volValue": {
          "type": "number"
        },
        "volExponent": {
          "type": "integer"
        }
      }
    },
    "density": {
      "type": "number"
    },
    "gravity": {
      "type": "number"
    },
    "escape": {
      "type": "number"
    },
    "meanRadius": {
      "type": "number"
    },
    "equaRadius": {
      "type": "number"
    },
    "polarRadius": {
      "type": "number"
    },
    "flattening": {
      "type": "number"
    },
    "dimension": {
      "type": "string"
    },
    "sideralOrbit": {
      "type": "number"
    },
    "sideralRotation": {
      "type": "number"
    },
    "aroundPlanet": {
      "type": "object",
      "properties": {
        "planet": {
          "type": "string"
        },
        "rel": {
          "type": "string"
        }
      }
    },
    "discoveredBy": {
      "type": "string"
    },
    "discoveryDate": {
      "type": "string"
    },
    "alternativeName": {
      "type": "string"
    },
    "axialTilt": {
      "type": "number"
    },
    "avgTemp": {
      "type": "number"
    },
    "mainAnomaly": {
      "type": "number"
    },
    "argPeriapsis": {
      "type": "number"
    },
    "longAscNode": {
      "type": "number"
    },
    "bodyType": {
      "type": "string"
    }
  }
}
  1. 同期構成./sample_files/configured_catalog.jsonを更新します。
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "detail_of_bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}
  1. データ読み取りをテストします。
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

最終的な出力は 284 件のレコードを読み取ったことを示します。

{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

カスタムコネクタをデプロイする#

  1. コネクタの Docker イメージを作成します。
docker build . -t airbyte/source-solar:dev
docker images

表示される内容:

REPOSITORY                     TAG             IMAGE ID       CREATED          SIZE
airbyte/source-solar           dev             b42429012ce0   22 seconds ago   120MB
  1. http://localhost:8000 / を開いて Airbyte ページにアクセスし、新しいコネクタを作成します。

image-20220626210721477

image-20220626211703757

作成後、コネクタリストに表示されます。

image-20220626211844654

カスタムコネクタを使用する#

新しい接続を作成します。

image-20220626212028444

image-20220626212107982

image-20220626212142644

同期を実行します。

image-20221203202150388

同期に成功しました。

image-20221203202516832

データベースを確認します。

image-20221203202717347

選択した Normalized tabular data のため、同期中に JSON オブジェクトがすべて展開されるため、複数のテーブルが作成されます。

image-20221203202815537

その中で solar_bodies は raw_data に対応し、bodies の列は最も原始的なレスポンスデータです。

image-20221203203008198

まとめ#

これで、Airbyte の全体的な入門チュートリアルが終了しました。他の ETL ツールと比較して、Airbyte は API ドキュメントに基づいて明確にサービスを提供し、ETL プロセスを実行できます。dbt と組み合わせることで ELT プロセスも実現でき、全過程で Python スクリプトを記述するだけで済むため、柔軟性が非常に高いです。Airbyte はまだ初期段階にありますが、将来が楽しみです。

参考資料#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。