Airbyte は 2020 年 7 月に発表された最新のオープンソース ETL ツールの一つです。他の ETL ツールとは異なり、Airbyte は UI と API を通じて即使用可能なコネクタを提供し、コミュニティの開発者がこのツールを監視および維持できるようにしています。コネクタは Docker コンテナとして実行され、選択した言語で構築できます。モジュール式コンポーネントとオプション機能のサブセットを提供することで、Airbyte はより大きな柔軟性を提供します。
クイックスタート#
ローカルに Airbyte をデプロイする#
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
起動後にアクセス: http://localhost:8000
データソースを追加する#
ターゲットソースを追加する#
まず、docker-compose を使用してローカルで Postgres データベースを起動します。
-
mkdir ./data
-
docker-compose.yaml を作成します:
version: "3" services: postgres: image: postgres:14 container_name: postgres restart: always environment: POSTGRES_DB: postgres POSTGRES_USER: postgres POSTGRES_PASSWORD: 123456 ports: - 5432:5432 volumes: - ./data:/var/lib/postgresql/data
-
docker-compose up -d
を実行して起動します。
次に、Airbyte でターゲットソースを設定します。
接続を確立する#
同期をトリガーする#
結果を確認する#
Airbyte CDK を使用してカスタム API データソースを開発する#
データソーステンプレートを作成する#
cd airbyte-integrations/connector-templates/generator # Airbyteプロジェクトのルートから開始することを前提としています。
# NPMがない場合は、https://www.npmjs.com/get-npmからインストールしてください
./generate.sh
Python HTTP API Source
を選択し、データソース名を入力します。ここではsola
を使用します。
Python 依存関係をインストールする#
cd ../../connectors/source-sola
python -m venv .venv # .venvディレクトリに仮想環境を作成します
source .venv/bin/activate # venvを有効にします
pip install -r requirements.txt
サンプルを実行する#
python main.py spec
以下の内容が出力されれば、プロジェクトの初期化が成功したことを示します。
{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}
入力パラメータを定義する#
これを実現する最も簡単な方法は、spec.yaml
ファイルを作成し、source_<name>/spec.yaml
にConnectorSpecificationスキーマに従ってコネクタの入力を記述します。ソースコードを開発する際の良い出発点です。JsonSchema を使用して、入力が何であるか(例:ユーザー名とパスワード)を定義します。
vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Solar System openData
type: object
required:
- page_size
additionalProperties: false
properties:
page_size:
type: integer
title: Page size
description: ページサイズ
default: 10
minimum: 1
pattern: \d+
接続の健康チェック#
この操作は、ユーザーが提供した入力構成が基盤となるデータソースに接続できるかどうかを検証します。ユーザーが提供した構成には、spec.yaml
に記載された値が含まれています。API にリクエストを送信するには、アクセス権を指定する必要があります。この例では、API は資格情報を必要としないため、かなり簡単なチェックです。./source_solar/source.py
を編集します:
class SolarStream(HttpStream, ABC):
"""
TODO このコメントを削除する
このクラスはコネクタによって出力されるストリームを表します。
これは、APIレベルでのすべての共通機能を含むことを目的とした抽象基底クラスです。例えば、APIの基本URL、ページネーション戦略、
レスポンスの解析などです。
各ストリームは、このクラス(またはその抽象サブクラス)を拡張して、そのストリームに特有の動作を指定する必要があります。
通常、REST APIの場合、各ストリームはAPI内のリソースに対応します。例えば、APIが次のエンドポイントを含む場合
- GET v1/customers
- GET v1/employees
その場合、次の3つのクラスを持つ必要があります:
`class SolarStream(HttpStream, ABC)`は現在のクラスです。
`class Customers(SolarStream)`は、v1/customersを使用して顧客のデータを取得する動作を含みます。
`class Employees(SolarStream)`は、v1/employeesを使用して従業員のデータを取得する動作を含みます。
一部のストリームが増分同期を実装する場合、通常は別のクラスを作成します。
`class IncrementalSolarStream((SolarStream), ABC)`を作成し、具体的なストリーム実装がそれを拡張します。以下に例を示します。
設定可能なオプションの完全なリストについては、リファレンスドキュメントを参照してください。
"""
# TODO: URLベースを埋める。必須。
url_base = "https://api.le-systeme-solaire.net/rest/"
...
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
の例を参照してください。
:param config: コネクタのspec.yamlに準拠したユーザー入力構成オブジェクト
:param logger: ロガーオブジェクト
:return Tuple[bool, any]: (True, None)は、入力構成がAPIに正常に接続できる場合、(False, error)はそれ以外の場合。
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
入力ファイルを作成し、チェックを実行します。
mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json
以下の内容が表示されれば、接続チェックが成功したことを示します。
{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
データストリームを宣言する#
インターフェースのデータストリームを宣言するには、次の手順を実行するだけです。
./source-solar/source.py
に HttpStream クラスを追加します。
class SolarStream(HttpStream, ABC):
"""
TODO このコメントを削除する
このクラスはコネクタによって出力されるストリームを表します。
これは、APIレベルでのすべての共通機能を含むことを目的とした抽象基底クラスです。例えば、APIの基本URL、ページネーション戦略、
レスポンスの解析などです。
各ストリームは、このクラス(またはその抽象サブクラス)を拡張して、そのストリームに特有の動作を指定する必要があります。
通常、REST APIの場合、各ストリームはAPI内のリソースに対応します。例えば、APIが次のエンドポイントを含む場合
- GET v1/customers
- GET v1/employees
その場合、次の3つのクラスを持つ必要があります:
`class SolarStream(HttpStream, ABC)`は現在のクラスです。
`class Customers(SolarStream)`は、v1/customersを使用して顧客のデータを取得する動作を含みます。
`class Employees(SolarStream)`は、v1/employeesを使用して従業員のデータを取得する動作を含みます。
一部のストリームが増分同期を実装する場合、通常は別のクラスを作成します。
`class IncrementalSolarStream((SolarStream), ABC)`を作成し、具体的なストリーム実装がそれを拡張します。以下に例を示します。
設定可能なオプションの完全なリストについては、リファレンスドキュメントを参照してください。
"""
# TODO: URLベースを埋める。必須。
url_base = "https://api.le-systeme-solaire.net/rest/"
page = 1
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__(**kwargs)
self.page_size = config["page_size"]
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
TODO: このメソッドをオーバーライドしてページネーション戦略を定義します。ページネーションを使用しない場合は、アクションは必要ありません - 単にNoneを返します。
このメソッドは、ページネーションリクエストを行うために必要な情報を含むマッピング(例:dict)を返す必要があります。この辞書は、このクラス内の他のメソッドに渡され、
ヘッダー、リクエストボディ、クエリパラメータなどを形成するのに役立ちます。
例えば、APIが結果のどのページを返すかを決定するために「page」パラメータを受け入れ、APIからのレスポンスに「page」番号が含まれている場合、
このメソッドはおそらく{'page': response.json()['page'] + 1}という辞書を返す必要があります。request_paramsメソッドは次に、入力next_page_tokenを読み取り、
'page'パラメータをnext_page_token['page']に設定します。
:param response: APIからの最新のレスポンス
:return 結果に別のページがある場合、レスポンス内の次のページをクエリするために必要な情報を含むマッピング(例:dict)を返します。
結果にもうページがない場合は、Noneを返します。
"""
return None
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: このメソッドをオーバーライドして設定するクエリパラメータを定義します。リクエストパラメータを定義する必要がない場合は、このメソッドを削除します。
通常、共通のパラメータ(例:ページネーションサイズなど)を含みます。
"""
return {}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: このメソッドをオーバーライドしてレスポンスの解析方法を定義します。
:return レスポンス内の各レコードを含むイテラブルを返します
"""
yield response.json()
class Bodies(SolarStream):
primary_key = 'id'
def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None) -> str:
return "bodies"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_data = response.json()
if len(response_data["bodies"]) == self.page_size:
self.page += 1
return {"page": f"{self.page}, {self.page_size}"}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: このメソッドをオーバーライドして設定するクエリパラメータを定義します。リクエストパラメータを定義する必要がない場合は、このメソッドを削除します。
通常、共通のパラメータ(例:ページネーションサイズなど)を含みます。
"""
params = {
"order": "id,desc",
"page": f"{self.page}, {self.page_size}"
}
if next_page_token:
params.update(next_page_token)
return params
- このクラスをインスタンス化します。
from airbyte_cdk.sources.streams.http.auth import NoAuth
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
の例を参照してください。
:param config: コネクタのspec.yamlに準拠したユーザー入力構成オブジェクト
:param logger: ロガーオブジェクト
:return Tuple[bool, any]: (True, None)は、入力構成がAPIに正常に接続できる場合、(False, error)はそれ以外の場合。
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: 以下のストリームを自分のストリームに置き換えます。
:param config: ユーザー入力構成のマッピング、コネクタ仕様に定義されています。
"""
# TODO: 認証が必要ない場合は、認証者を削除します。
auth = NoAuth()
return [Bodies(authenticator=auth, config=config)]
./source-solar/schemas
にbodies.json
を新規作成します。
{
"type": "object",
"required": [
"page_size"
],
"properties": {
"bodies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"englishName": {
"type": "string"
},
"isPlanet": {
"type": "boolean"
},
"moons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"moon": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
},
"semimajorAxis": {
"type": "number"
},
"perihelion": {
"type": "number"
},
"aphelion": {
"type": "number"
},
"eccentricity": {
"type": "number"
},
"inclination": {
"type": "number"
},
"mass": {
"type": "object",
"properties": {
"massValue": {
"type": "number"
},
"massExponent": {
"type": "integer"
}
}
},
"vol": {
"type": "object",
"properties": {
"volValue": {
"type": "number"
},
"volExponent": {
"type": "integer"
}
}
},
"density": {
"type": "number"
},
"gravity": {
"type": "number"
},
"escape": {
"type": "number"
},
"meanRadius": {
"type": "number"
},
"equaRadius": {
"type": "number"
},
"polarRadius": {
"type": "number"
},
"flattening": {
"type": "number"
},
"dimension": {
"type": "string"
},
"sideralOrbit": {
"type": "number"
},
"sideralRotation": {
"type": "number"
},
"aroundPlanet": {
"type": "object",
"properties": {
"planet": {
"type": "string"
},
"rel": {
"type": "string"
}
}
},
"discoveredBy": {
"type": "string"
},
"discoveryDate": {
"type": "string"
},
"alternativeName": {
"type": "string"
},
"axialTilt": {
"type": "number"
},
"avgTemp": {
"type": "number"
},
"mainAnomaly": {
"type": "number"
},
"argPeriapsis": {
"type": "number"
},
"longAscNode": {
"type": "number"
},
"bodyType": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
}
}
}
- チェックを実行します。
python main.py discover --config ./sample_files/secrets/config.json
正しい出力は次のとおりです。
{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}
データを読み取る#
読み取り構成を作成します。
vi ./sample_files/configured_catalog.json
{
"streams": [
{
"stream": {
"name": "bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
データ読み取りを実行します。
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json
出力は次のとおりです。
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "Vénus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadius": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alternativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion": 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch", "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Valétudo", "englishName": "Valetudo", "isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupiter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Obéron", "rel": "https://api.le-systeme-solaire.net/rest/bodies/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cordélia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"moon": "Ophélie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desdémone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": "https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-systeme-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bodies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon": "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457, "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 25362.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "perihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton", "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent": 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}
...
{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}
依存関係を処理する#
- HttpSubStream クラスを作成します。
class SolarSubStream(HttpSubStream, SolarStream, ABC):
# 無効なリンクをスキップします
raise_on_http_errors = False
def __init__(self, parent: SolarStream, **kwargs):
super().__init__(parent=parent, **kwargs)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: このメソッドをオーバーライドしてレスポンスの解析方法を定義します。
:return レスポンス内の各レコードを含むイテラブルを返します
"""
yield response.json()
- 詳細インターフェースクラスを作成します。
class DetailOfBodies(SolarSubStream):
primary_key = 'id'
def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None) -> str:
return f'bodies/{stream_slice["id"]}'
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
for data in record["bodies"]:
yield {"id": data["id"]}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if response.status_code == 200:
yield response.json()
else:
self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
- 詳細インターフェースをインスタンス化します。
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
の例を参照してください。
:param config: コネクタのspec.yamlに準拠したユーザー入力構成オブジェクト
:param logger: ロガーオブジェクト
:return Tuple[bool, any]: (True, None)は、入力構成がAPIに正常に接続できる場合、(False, error)はそれ以外の場合。
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: 以下のストリームを自分のストリームに置き換えます。
:param config: ユーザー入力構成のマッピング、コネクタ仕様に定義されています。
"""
# TODO: 認証が必要ない場合は、認証者を削除します。
auth = NoAuth()
return [Bodies(authenticator=auth, config=config),
DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
- レスポンス構造を宣言します。
vi ./source_solar/schemas/detail_of_bodies.json
{
"type": "object",
"required": [
"page_size"
],
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"englishName": {
"type": "string"
},
"isPlanet": {
"type": "boolean"
},
"moons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"moon": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
},
"semimajorAxis": {
"type": "number"
},
"perihelion": {
"type": "number"
},
"aphelion": {
"type": "number"
},
"eccentricity": {
"type": "number"
},
"inclination": {
"type": "number"
},
"mass": {
"type": "object",
"properties": {
"massValue": {
"type": "number"
},
"massExponent": {
"type": "integer"
}
}
},
"vol": {
"type": "object",
"properties": {
"volValue": {
"type": "number"
},
"volExponent": {
"type": "integer"
}
}
},
"density": {
"type": "number"
},
"gravity": {
"type": "number"
},
"escape": {
"type": "number"
},
"meanRadius": {
"type": "number"
},
"equaRadius": {
"type": "number"
},
"polarRadius": {
"type": "number"
},
"flattening": {
"type": "number"
},
"dimension": {
"type": "string"
},
"sideralOrbit": {
"type": "number"
},
"sideralRotation": {
"type": "number"
},
"aroundPlanet": {
"type": "object",
"properties": {
"planet": {
"type": "string"
},
"rel": {
"type": "string"
}
}
},
"discoveredBy": {
"type": "string"
},
"discoveryDate": {
"type": "string"
},
"alternativeName": {
"type": "string"
},
"axialTilt": {
"type": "number"
},
"avgTemp": {
"type": "number"
},
"mainAnomaly": {
"type": "number"
},
"argPeriapsis": {
"type": "number"
},
"longAscNode": {
"type": "number"
},
"bodyType": {
"type": "string"
}
}
}
- 同期構成
./sample_files/configured_catalog.json
を更新します。
{
"streams": [
{
"stream": {
"name": "bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "detail_of_bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
- データ読み取りをテストします。
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json
最終的な出力は 284 件のレコードを読み取ったことを示します。
{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}
カスタムコネクタをデプロイする#
- コネクタの Docker イメージを作成します。
docker build . -t airbyte/source-solar:dev
docker images
表示される内容:
REPOSITORY TAG IMAGE ID CREATED SIZE
airbyte/source-solar dev b42429012ce0 22 seconds ago 120MB
作成後、コネクタリストに表示されます。
カスタムコネクタを使用する#
新しい接続を作成します。
同期を実行します。
同期に成功しました。
データベースを確認します。
選択した Normalized tabular data のため、同期中に JSON オブジェクトがすべて展開されるため、複数のテーブルが作成されます。
その中で solar_bodies は raw_data に対応し、bodies の列は最も原始的なレスポンスデータです。
まとめ#
これで、Airbyte の全体的な入門チュートリアルが終了しました。他の ETL ツールと比較して、Airbyte は API ドキュメントに基づいて明確にサービスを提供し、ETL プロセスを実行できます。dbt と組み合わせることで ELT プロセスも実現でき、全過程で Python スクリプトを記述するだけで済むため、柔軟性が非常に高いです。Airbyte はまだ初期段階にありますが、将来が楽しみです。