banner
子文

子文

世界,你好鸭~
x
github

Airbyte實戰

Airbyte 是 2020 年 7 月推出的最新開源 ETL 工具之一。它不同於其他 ETL 工具,因為它透過 UI 和 API 提供開箱即用的連接器,允許社區開發人員監控和維護該工具。連接器作為 Docker 容器運行,可以以您選擇的語言構建。透過提供模組化組件和可選功能子集,Airbyte 提供了更大的靈活性。

快速開始#

本地部署 Airbyte#

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

啟動後訪問: http://localhost:8000

添加數據源#

image-20220619122412393

添加目標源#

首先先用 docker-compose 在本地啟動一個 postgres 數據庫

  1. mkdir ./data

  2. 創建 docker-compose.yaml:

    version: "3"
    services:
      postgres:
        image: postgres:14
        container_name: postgres
        restart: always
        environment:
          POSTGRES_DB: postgres
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: 123456
        ports:
            - 5432:5432
        volumes:
          - ./data:/var/lib/postgresql/data
    
  3. docker-compose up -d啟動

然後再 Airbyte 上配置目標源

image-20220619134741698

建立連接#

image-20220619134925755

image-20220619134942144

image-20220619135033394

觸發同步#

image-20220619135226032

結果查看#

image-20220619135522388

使用 Airbyte CDK 開發自定義 API 數據源#

創建數據源模板#

cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
# Install NPM from https://www.npmjs.com/get-npm if you don't have it
./generate.sh

選擇Python HTTP API Source,輸入數據源名稱,這裡使用sola

image-20220619140718389

安裝 python 依賴#

cd ../../connectors/source-sola
python -m venv .venv # Create a virtual environment in the .venv directory
source .venv/bin/activate # enable the venv
pip install -r requirements.txt

執行範例#

python main.py spec

輸出以下內容說明初始化項目成功

{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}

定義輸入參數#

實現這一點的最簡單方法是創建一個spec.yaml文件,在source_<name>/spec.yaml該文件中根據ConnectorSpecification模式描述連接器的輸入。在開發源代碼時,這是一個很好的起點。使用 JsonSchema,定義輸入是什麼(例如用戶名和密碼)。

vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
  $schema: http://json-schema.org/draft-07/schema#
  title: Solar System openData
  type: object
  required:
    - page_size
  additionalProperties: false
  properties:
    page_size:
      type: integer
      title: Page size
      description: 分頁大小
      default: 10
      minimum: 1
      pattern: \d+

連接健康檢查#

此操作驗證用戶提供的輸入配置是否可用於連接到底層數據源。請注意,此用戶提供的配置具有spec.yaml填寫中描述的值。為了向 API 發出請求,我們需要指定訪問權限。在我們的例子中,這是一個相當簡單的檢查,因為 API 不需要憑據。編輯./source_solar/source.py

class SolarStream(HttpStream, ABC):
    """
    TODO remove this comment

    This class represents a stream output by the connector.
    This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
    parsing responses etc..

    Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.

    Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
    contains the endpoints
        - GET v1/customers
        - GET v1/employees

    then you should have three classes:
    `class SolarStream(HttpStream, ABC)` which is the current class
    `class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
    `class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees

    If some streams implement incremental sync, it is typical to create another class
    `class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
    is provided below.

    See the reference docs for the full list of configurable options.
    """

    # TODO: Fill in the url base. Required.
    url_base = "https://api.le-systeme-solaire.net/rest/"
    
...
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text        
        return False, message

創建輸入文件並執行檢查

mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json

顯示以下內容則說明連接檢查成功

{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

聲明數據流#

聲明一個接口的數據流我們只需要以下幾個步驟:

  1. ./source-solar/source.py中添加一個 HttpStream 類
class SolarStream(HttpStream, ABC):
    """
    TODO remove this comment

    This class represents a stream output by the connector.
    This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
    parsing responses etc..

    Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.

    Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
    contains the endpoints
        - GET v1/customers
        - GET v1/employees

    then you should have three classes:
    `class SolarStream(HttpStream, ABC)` which is the current class
    `class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
    `class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees

    If some streams implement incremental sync, it is typical to create another class
    `class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
    is provided below.

    See the reference docs for the full list of configurable options.
    """

    # TODO: Fill in the url base. Required.
    url_base = "https://api.le-systeme-solaire.net/rest/"
    page = 1

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__(**kwargs)
        self.page_size = config["page_size"]


    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        """
        TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.

        This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
        to most other methods in this class to help you form headers, request bodies, query params, etc..

        For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
        'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
        The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].

        :param response: the most recent response from the API
        :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
                If there are no more pages in the result, return None.
        """
        return None

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
        Usually contains common params e.g. pagination size etc.
        """
        return {}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        yield response.json()


class Bodies(SolarStream):

    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return "bodies"

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        response_data = response.json()
        if len(response_data["bodies"]) == self.page_size:
            self.page += 1
            return {"page": f"{self.page}, {self.page_size}"}

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
        Usually contains common params e.g. pagination size etc.
        """
        params = {
            "order": "id,desc",
            "page": f"{self.page}, {self.page_size}"
        }
        if next_page_token:
            params.update(next_page_token)
        return params
  1. 實例化這個類
from airbyte_cdk.sources.streams.http.auth import NoAuth

class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: Replace the streams below with your own streams.

        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        # TODO remove the authenticator if not required.
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config)]
  1. ./source-solar/schemas中新建bodies.json
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "bodies": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "englishName": {
            "type": "string"
          },
          "isPlanet": {
            "type": "boolean"
          },
          "moons": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "moon": {
                  "type": "string"
                },
                "rel": {
                  "type": "string"
                }
              }
            }
          },
          "semimajorAxis": {
            "type": "number"
          },
          "perihelion": {
            "type": "number"
          },
          "aphelion": {
            "type": "number"
          },
          "eccentricity": {
            "type": "number"
          },
          "inclination": {
            "type": "number"
          },
          "mass": {
            "type": "object",
            "properties": {
              "massValue": {
                "type": "number"
              },
              "massExponent": {
                "type": "integer"
              }
            }
          },
          "vol": {
            "type": "object",
            "properties": {
              "volValue": {
                "type": "number"
              },
              "volExponent": {
                "type": "integer"
              }
            }
          },
          "density": {
            "type": "number"
          },
          "gravity": {
            "type": "number"
          },
          "escape": {
            "type": "number"
          },
          "meanRadius": {
            "type": "number"
          },
          "equaRadius": {
            "type": "number"
          },
          "polarRadius": {
            "type": "number"
          },
          "flattening": {
            "type": "number"
          },
          "dimension": {
            "type": "string"
          },
          "sideralOrbit": {
            "type": "number"
          },
          "sideralRotation": {
            "type": "number"
          },
          "aroundPlanet": {
            "type": "object",
            "properties": {
              "planet": {
                "type": "string"
              },
              "rel": {
                "type": "string"
              }
            }
          },
          "discoveredBy": {
            "type": "string"
          },
          "discoveryDate": {
            "type": "string"
          },
          "alternativeName": {
            "type": "string"
          },
          "axialTilt": {
            "type": "number"
          },
          "avgTemp": {
            "type": "number"
          },
          "mainAnomaly": {
            "type": "number"
          },
          "argPeriapsis": {
            "type": "number"
          },
          "longAscNode": {
            "type": "number"
          },
          "bodyType": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    }
  }
}
  1. 執行檢查
python main.py discover --config ./sample_files/secrets/config.json

正確輸出為

{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}

讀取數據#

創建讀取配置

vi ./sample_files/configured_catalog.json
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

運行數據讀取

python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

輸出為

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "Vénus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadius": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alternativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion": 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch", "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Valétudo", "englishName": "Valetudo", "isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupiter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Obéron", "rel": "https://api.le-systeme-solaire.net/rest/bodies/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cordélia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"moon": "Ophélie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desdémone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": "https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-systeme-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bodies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon": "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457, "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 25362.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "perihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton", "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent": 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}

...

{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

處理依賴關係#

  1. 創建 HttpSubStream 類
class SolarSubStream(HttpSubStream, SolarStream, ABC):
    # 跳過無效鏈接
    raise_on_http_errors = False

    def __init__(self, parent: SolarStream, **kwargs):
        super().__init__(parent=parent, **kwargs)

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        yield response.json()
  1. 創建詳情接口類
class DetailOfBodies(SolarSubStream):
    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return f'bodies/{stream_slice["id"]}'

    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
            for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
                for data in record["bodies"]:
                    yield {"id": data["id"]}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        if response.status_code == 200:
            yield response.json()
        else:
            self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
  1. 實例化詳情接口
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: Replace the streams below with your own streams.

        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        # TODO remove the authenticator if not required.
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config),
                DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
  1. 申明響應結構
vi ./source_solar/schemas/detail_of_bodies.json
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "englishName": {
      "type": "string"
    },
    "isPlanet": {
      "type": "boolean"
    },
    "moons": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "moon": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    },
    "semimajorAxis": {
      "type": "number"
    },
    "perihelion": {
      "type": "number"
    },
    "aphelion": {
      "type": "number"
    },
    "eccentricity": {
      "type": "number"
    },
    "inclination": {
      "type": "number"
    },
    "mass": {
      "type": "object",
      "properties": {
        "massValue": {
          "type": "number"
        },
        "massExponent": {
          "type": "integer"
        }
      }
    },
    "vol": {
      "type": "object",
      "properties": {
        "volValue": {
          "type": "number"
        },
        "volExponent": {
          "type": "integer"
        }
      }
    },
    "density": {
      "type": "number"
    },
    "gravity": {
      "type": "number"
    },
    "escape": {
      "type": "number"
    },
    "meanRadius": {
      "type": "number"
    },
    "equaRadius": {
      "type": "number"
    },
    "polarRadius": {
      "type": "number"
    },
    "flattening": {
      "type": "number"
    },
    "dimension": {
      "type": "string"
    },
    "sideralOrbit": {
      "type": "number"
    },
    "sideralRotation": {
      "type": "number"
    },
    "aroundPlanet": {
      "type": "object",
      "properties": {
        "planet": {
          "type": "string"
        },
        "rel": {
          "type": "string"
        }
      }
    },
    "discoveredBy": {
      "type": "string"
    },
    "discoveryDate": {
      "type": "string"
    },
    "alternativeName": {
      "type": "string"
    },
    "axialTilt": {
      "type": "number"
    },
    "avgTemp": {
      "type": "number"
    },
    "mainAnomaly": {
      "type": "number"
    },
    "argPeriapsis": {
      "type": "number"
    },
    "longAscNode": {
      "type": "number"
    },
    "bodyType": {
      "type": "string"
    }
  }
}
  1. 更新同步配置./sample_files/configured_catalog.json
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "detail_of_bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}
  1. 測試讀取數據
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

最後的輸出為讀取 284 條記錄

{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

部署自定義連接器#

  1. 製作連接器 docker 鏡像
docker build . -t airbyte/source-solar:dev
docker images

顯示

REPOSITORY                     TAG             IMAGE ID       CREATED          SIZE
airbyte/source-solar           dev             b42429012ce0   22 seconds ago   120MB
  1. http://localhost:8000 / 打開 airbyte 頁面,新建連接器

image-20220626210721477

image-20220626211703757

創建好後可在連接器列表中看到

image-20220626211844654

使用自定義連接器#

新建一個連接

image-20220626212028444

image-20220626212107982

image-20220626212142644

執行同步

image-20221203202150388

同步成功

image-20221203202516832

查看數據庫

image-20221203202717347

由於我們選擇的 Normalized tabular data,因此同步的時候會自動將我們 json 對象全部展開,因此會有多張表

image-20221203202815537

其中 solar_bodies 對應的就是 raw_data,bodies 一列即為最原始的響應數據

image-20221203203008198

總結#

至此,airbyte 的整個入門教程就結束了,可以看見相對其他的 ETL 工具,airbyte 可以對接口文檔清晰的服務直接進行 ETL 過程,配合 dbt 也可以實現 ELT 流程,全程也是只需編寫 python 腳本即可,靈活性非常高。雖然 airbyte 仍處於初生階段,但未來可期。

參考資料#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。