Airbyte 是 2020 年 7 月推出的最新開源 ETL 工具之一。它不同於其他 ETL 工具,因為它透過 UI 和 API 提供開箱即用的連接器,允許社區開發人員監控和維護該工具。連接器作為 Docker 容器運行,可以以您選擇的語言構建。透過提供模組化組件和可選功能子集,Airbyte 提供了更大的靈活性。
快速開始#
本地部署 Airbyte#
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
啟動後訪問: http://localhost:8000
添加數據源#
添加目標源#
首先先用 docker-compose 在本地啟動一個 postgres 數據庫
-
mkdir ./data
-
創建 docker-compose.yaml:
version: "3" services: postgres: image: postgres:14 container_name: postgres restart: always environment: POSTGRES_DB: postgres POSTGRES_USER: postgres POSTGRES_PASSWORD: 123456 ports: - 5432:5432 volumes: - ./data:/var/lib/postgresql/data
-
docker-compose up -d
啟動
然後再 Airbyte 上配置目標源
建立連接#
觸發同步#
結果查看#
使用 Airbyte CDK 開發自定義 API 數據源#
創建數據源模板#
cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
# Install NPM from https://www.npmjs.com/get-npm if you don't have it
./generate.sh
選擇Python HTTP API Source
,輸入數據源名稱,這裡使用sola
安裝 python 依賴#
cd ../../connectors/source-sola
python -m venv .venv # Create a virtual environment in the .venv directory
source .venv/bin/activate # enable the venv
pip install -r requirements.txt
執行範例#
python main.py spec
輸出以下內容說明初始化項目成功
{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}
定義輸入參數#
實現這一點的最簡單方法是創建一個spec.yaml
文件,在source_<name>/spec.yaml
該文件中根據ConnectorSpecification模式描述連接器的輸入。在開發源代碼時,這是一個很好的起點。使用 JsonSchema,定義輸入是什麼(例如用戶名和密碼)。
vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Solar System openData
type: object
required:
- page_size
additionalProperties: false
properties:
page_size:
type: integer
title: Page size
description: 分頁大小
default: 10
minimum: 1
pattern: \d+
連接健康檢查#
此操作驗證用戶提供的輸入配置是否可用於連接到底層數據源。請注意,此用戶提供的配置具有spec.yaml
填寫中描述的值。為了向 API 發出請求,我們需要指定訪問權限。在我們的例子中,這是一個相當簡單的檢查,因為 API 不需要憑據。編輯./source_solar/source.py
:
class SolarStream(HttpStream, ABC):
"""
TODO remove this comment
This class represents a stream output by the connector.
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
parsing responses etc..
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
contains the endpoints
- GET v1/customers
- GET v1/employees
then you should have three classes:
`class SolarStream(HttpStream, ABC)` which is the current class
`class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
`class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees
If some streams implement incremental sync, it is typical to create another class
`class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
is provided below.
See the reference docs for the full list of configurable options.
"""
# TODO: Fill in the url base. Required.
url_base = "https://api.le-systeme-solaire.net/rest/"
...
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
創建輸入文件並執行檢查
mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json
顯示以下內容則說明連接檢查成功
{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
聲明數據流#
聲明一個接口的數據流我們只需要以下幾個步驟:
- 在
./source-solar/source.py
中添加一個 HttpStream 類
class SolarStream(HttpStream, ABC):
"""
TODO remove this comment
This class represents a stream output by the connector.
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
parsing responses etc..
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
contains the endpoints
- GET v1/customers
- GET v1/employees
then you should have three classes:
`class SolarStream(HttpStream, ABC)` which is the current class
`class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
`class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees
If some streams implement incremental sync, it is typical to create another class
`class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
is provided below.
See the reference docs for the full list of configurable options.
"""
# TODO: Fill in the url base. Required.
url_base = "https://api.le-systeme-solaire.net/rest/"
page = 1
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__(**kwargs)
self.page_size = config["page_size"]
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.
This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
to most other methods in this class to help you form headers, request bodies, query params, etc..
For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
If there are no more pages in the result, return None.
"""
return None
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
Usually contains common params e.g. pagination size etc.
"""
return {}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
yield response.json()
class Bodies(SolarStream):
primary_key = 'id'
def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None) -> str:
return "bodies"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_data = response.json()
if len(response_data["bodies"]) == self.page_size:
self.page += 1
return {"page": f"{self.page}, {self.page_size}"}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
Usually contains common params e.g. pagination size etc.
"""
params = {
"order": "id,desc",
"page": f"{self.page}, {self.page_size}"
}
if next_page_token:
params.update(next_page_token)
return params
- 實例化這個類
from airbyte_cdk.sources.streams.http.auth import NoAuth
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: Replace the streams below with your own streams.
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
# TODO remove the authenticator if not required.
auth = NoAuth()
return [Bodies(authenticator=auth, config=config)]
- 在
./source-solar/schemas
中新建bodies.json
{
"type": "object",
"required": [
"page_size"
],
"properties": {
"bodies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"englishName": {
"type": "string"
},
"isPlanet": {
"type": "boolean"
},
"moons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"moon": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
},
"semimajorAxis": {
"type": "number"
},
"perihelion": {
"type": "number"
},
"aphelion": {
"type": "number"
},
"eccentricity": {
"type": "number"
},
"inclination": {
"type": "number"
},
"mass": {
"type": "object",
"properties": {
"massValue": {
"type": "number"
},
"massExponent": {
"type": "integer"
}
}
},
"vol": {
"type": "object",
"properties": {
"volValue": {
"type": "number"
},
"volExponent": {
"type": "integer"
}
}
},
"density": {
"type": "number"
},
"gravity": {
"type": "number"
},
"escape": {
"type": "number"
},
"meanRadius": {
"type": "number"
},
"equaRadius": {
"type": "number"
},
"polarRadius": {
"type": "number"
},
"flattening": {
"type": "number"
},
"dimension": {
"type": "string"
},
"sideralOrbit": {
"type": "number"
},
"sideralRotation": {
"type": "number"
},
"aroundPlanet": {
"type": "object",
"properties": {
"planet": {
"type": "string"
},
"rel": {
"type": "string"
}
}
},
"discoveredBy": {
"type": "string"
},
"discoveryDate": {
"type": "string"
},
"alternativeName": {
"type": "string"
},
"axialTilt": {
"type": "number"
},
"avgTemp": {
"type": "number"
},
"mainAnomaly": {
"type": "number"
},
"argPeriapsis": {
"type": "number"
},
"longAscNode": {
"type": "number"
},
"bodyType": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
}
}
}
- 執行檢查
python main.py discover --config ./sample_files/secrets/config.json
正確輸出為
{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}
讀取數據#
創建讀取配置
vi ./sample_files/configured_catalog.json
{
"streams": [
{
"stream": {
"name": "bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
運行數據讀取
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json
輸出為
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "Vénus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadius": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alternativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion": 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch", "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Valétudo", "englishName": "Valetudo", "isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupiter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Obéron", "rel": "https://api.le-systeme-solaire.net/rest/bodies/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cordélia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"moon": "Ophélie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desdémone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": "https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-systeme-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bodies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon": "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457, "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 25362.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "perihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton", "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent": 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}
...
{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}
處理依賴關係#
- 創建 HttpSubStream 類
class SolarSubStream(HttpSubStream, SolarStream, ABC):
# 跳過無效鏈接
raise_on_http_errors = False
def __init__(self, parent: SolarStream, **kwargs):
super().__init__(parent=parent, **kwargs)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
yield response.json()
- 創建詳情接口類
class DetailOfBodies(SolarSubStream):
primary_key = 'id'
def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None) -> str:
return f'bodies/{stream_slice["id"]}'
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
for data in record["bodies"]:
yield {"id": data["id"]}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if response.status_code == 200:
yield response.json()
else:
self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
- 實例化詳情接口
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: Replace the streams below with your own streams.
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
# TODO remove the authenticator if not required.
auth = NoAuth()
return [Bodies(authenticator=auth, config=config),
DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
- 申明響應結構
vi ./source_solar/schemas/detail_of_bodies.json
{
"type": "object",
"required": [
"page_size"
],
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"englishName": {
"type": "string"
},
"isPlanet": {
"type": "boolean"
},
"moons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"moon": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
},
"semimajorAxis": {
"type": "number"
},
"perihelion": {
"type": "number"
},
"aphelion": {
"type": "number"
},
"eccentricity": {
"type": "number"
},
"inclination": {
"type": "number"
},
"mass": {
"type": "object",
"properties": {
"massValue": {
"type": "number"
},
"massExponent": {
"type": "integer"
}
}
},
"vol": {
"type": "object",
"properties": {
"volValue": {
"type": "number"
},
"volExponent": {
"type": "integer"
}
}
},
"density": {
"type": "number"
},
"gravity": {
"type": "number"
},
"escape": {
"type": "number"
},
"meanRadius": {
"type": "number"
},
"equaRadius": {
"type": "number"
},
"polarRadius": {
"type": "number"
},
"flattening": {
"type": "number"
},
"dimension": {
"type": "string"
},
"sideralOrbit": {
"type": "number"
},
"sideralRotation": {
"type": "number"
},
"aroundPlanet": {
"type": "object",
"properties": {
"planet": {
"type": "string"
},
"rel": {
"type": "string"
}
}
},
"discoveredBy": {
"type": "string"
},
"discoveryDate": {
"type": "string"
},
"alternativeName": {
"type": "string"
},
"axialTilt": {
"type": "number"
},
"avgTemp": {
"type": "number"
},
"mainAnomaly": {
"type": "number"
},
"argPeriapsis": {
"type": "number"
},
"longAscNode": {
"type": "number"
},
"bodyType": {
"type": "string"
}
}
}
- 更新同步配置
./sample_files/configured_catalog.json
{
"streams": [
{
"stream": {
"name": "bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "detail_of_bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
- 測試讀取數據
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json
最後的輸出為讀取 284 條記錄
{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}
部署自定義連接器#
- 製作連接器 docker 鏡像
docker build . -t airbyte/source-solar:dev
docker images
顯示
REPOSITORY TAG IMAGE ID CREATED SIZE
airbyte/source-solar dev b42429012ce0 22 seconds ago 120MB
創建好後可在連接器列表中看到
使用自定義連接器#
新建一個連接
執行同步
同步成功
查看數據庫
由於我們選擇的 Normalized tabular data,因此同步的時候會自動將我們 json 對象全部展開,因此會有多張表
其中 solar_bodies 對應的就是 raw_data,bodies 一列即為最原始的響應數據
總結#
至此,airbyte 的整個入門教程就結束了,可以看見相對其他的 ETL 工具,airbyte 可以對接口文檔清晰的服務直接進行 ETL 過程,配合 dbt 也可以實現 ELT 流程,全程也是只需編寫 python 腳本即可,靈活性非常高。雖然 airbyte 仍處於初生階段,但未來可期。