banner
子文

子文

世界,你好鸭~
x
github

Airbyte Practical Application

Airbyte is one of the latest open-source ETL tools launched in July 2020. It differs from other ETL tools because it provides out-of-the-box connectors through a UI and API, allowing community developers to monitor and maintain the tool. Connectors run as Docker containers and can be built in any language of your choice. By providing modular components and optional subsets of features, Airbyte offers greater flexibility.

Quick Start#

Local Deployment of Airbyte#

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

After starting, access: http://localhost:8000

Add Data Source#

image-20220619122412393

Add Destination Source#

First, start a PostgreSQL database locally using docker-compose.

  1. mkdir ./data

  2. Create docker-compose.yaml:

    version: "3"
    services:
      postgres:
        image: postgres:14
        container_name: postgres
        restart: always
        environment:
          POSTGRES_DB: postgres
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: 123456
        ports:
            - 5432:5432
        volumes:
          - ./data:/var/lib/postgresql/data
    
  3. docker-compose up -d to start.

Then configure the destination source in Airbyte.

image-20220619134741698

Establish Connection#

image-20220619134925755

image-20220619134942144

image-20220619135033394

Trigger Sync#

image-20220619135226032

View Results#

image-20220619135522388

Use Airbyte CDK to Develop Custom API Data Source#

Create Data Source Template#

cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
# Install NPM from https://www.npmjs.com/get-npm if you don't have it
./generate.sh

Select Python HTTP API Source, enter the data source name, here using sola.

image-20220619140718389

Install Python Dependencies#

cd ../../connectors/source-sola
python -m venv .venv # Create a virtual environment in the .venv directory
source .venv/bin/activate # enable the venv
pip install -r requirements.txt

Execute Example#

python main.py spec

Outputting the following indicates successful project initialization:

{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}

Define Input Parameters#

The simplest way to achieve this is to create a spec.yaml file in source_<name>/spec.yaml that describes the connector's input according to the ConnectorSpecification schema. This is a good starting point when developing the source code. Use JsonSchema to define what the inputs are (e.g., username and password).

vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
  $schema: http://json-schema.org/draft-07/schema#
  title: Solar System openData
  type: object
  required:
    - page_size
  additionalProperties: false
  properties:
    page_size:
      type: integer
      title: Page size
      description: Page size
      default: 10
      minimum: 1
      pattern: \d+

Connection Health Check#

This operation verifies whether the user-provided input configuration can be used to connect to the underlying data source. Note that this user-provided configuration contains the values described in spec.yaml. To make a request to the API, we need to specify access permissions. In our case, this is a relatively simple check since the API does not require credentials. Edit ./source_solar/source.py:

class SolarStream(HttpStream, ABC):
    """
    TODO remove this comment

    This class represents a stream output by the connector.
    This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
    parsing responses etc..

    Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.

    Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
    contains the endpoints
        - GET v1/customers
        - GET v1/employees

    then you should have three classes:
    `class SolarStream(HttpStream, ABC)` which is the current class
    `class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
    `class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees

    If some streams implement incremental sync, it is typical to create another class
    `class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
    is provided below.

    See the reference docs for the full list of configurable options.
    """

    # TODO: Fill in the url base. Required.
    url_base = "https://api.le-systeme-solaire.net/rest/"
    
...
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text        
        return False, message

Create an input file and execute the check.

mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json

Displaying the following indicates that the connection check was successful:

{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

Declare Data Stream#

To declare a data stream for an interface, we only need the following steps:

  1. Add an HttpStream class in ./source-solar/source.py.
class SolarStream(HttpStream, ABC):
    """
    TODO remove this comment

    This class represents a stream output by the connector.
    This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
    parsing responses etc..

    Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.

    Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
    contains the endpoints
        - GET v1/customers
        - GET v1/employees

    then you should have three classes:
    `class SolarStream(HttpStream, ABC)` which is the current class
    `class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
    `class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees

    If some streams implement incremental sync, it is typical to create another class
    `class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
    is provided below.

    See the reference docs for the full list of configurable options.
    """

    # TODO: Fill in the url base. Required.
    url_base = "https://api.le-systeme-solaire.net/rest/"
    page = 1

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__(**kwargs)
        self.page_size = config["page_size"]

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        """
        TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.

        This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
        to most other methods in this class to help you form headers, request bodies, query params, etc..

        For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
        'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
        The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].

        :param response: the most recent response from the API
        :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
                If there are no more pages in the result, return None.
        """
        return None

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
        Usually contains common params e.g. pagination size etc.
        """
        return {}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        yield response.json()


class Bodies(SolarStream):

    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return "bodies"

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        response_data = response.json()
        if len(response_data["bodies"]) == self.page_size:
            self.page += 1
            return {"page": f"{self.page}, {self.page_size}"}

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
        Usually contains common params e.g. pagination size etc.
        """
        params = {
            "order": "id,desc",
            "page": f"{self.page}, {self.page_size}"
        }
        if next_page_token:
            params.update(next_page_token)
        return params
  1. Instantiate this class.
from airbyte_cdk.sources.streams.http.auth import NoAuth

class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: Replace the streams below with your own streams.

        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        # TODO remove the authenticator if not required.
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config)]
  1. Create bodies.json in ./source-solar/schemas.
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "bodies": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "englishName": {
            "type": "string"
          },
          "isPlanet": {
            "type": "boolean"
          },
          "moons": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "moon": {
                  "type": "string"
                },
                "rel": {
                  "type": "string"
                }
              }
            }
          },
          "semimajorAxis": {
            "type": "number"
          },
          "perihelion": {
            "type": "number"
          },
          "aphelion": {
            "type": "number"
          },
          "eccentricity": {
            "type": "number"
          },
          "inclination": {
            "type": "number"
          },
          "mass": {
            "type": "object",
            "properties": {
              "massValue": {
                "type": "number"
              },
              "massExponent": {
                "type": "integer"
              }
            }
          },
          "vol": {
            "type": "object",
            "properties": {
              "volValue": {
                "type": "number"
              },
              "volExponent": {
                "type": "integer"
              }
            }
          },
          "density": {
            "type": "number"
          },
          "gravity": {
            "type": "number"
          },
          "escape": {
            "type": "number"
          },
          "meanRadius": {
            "type": "number"
          },
          "equaRadius": {
            "type": "number"
          },
          "polarRadius": {
            "type": "number"
          },
          "flattening": {
            "type": "number"
          },
          "dimension": {
            "type": "string"
          },
          "sideralOrbit": {
            "type": "number"
          },
          "sideralRotation": {
            "type": "number"
          },
          "aroundPlanet": {
            "type": "object",
            "properties": {
              "planet": {
                "type": "string"
              },
              "rel": {
                "type": "string"
              }
            }
          },
          "discoveredBy": {
            "type": "string"
          },
          "discoveryDate": {
            "type": "string"
          },
          "alternativeName": {
            "type": "string"
          },
          "axialTilt": {
            "type": "number"
          },
          "avgTemp": {
            "type": "number"
          },
          "mainAnomaly": {
            "type": "number"
          },
          "argPeriapsis": {
            "type": "number"
          },
          "longAscNode": {
            "type": "number"
          },
          "bodyType": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    }
  }
}
  1. Execute the check.
python main.py discover --config ./sample_files/secrets/config.json

The correct output is:

{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}

Read Data#

Create a read configuration.

vi ./sample_files/configured_catalog.json
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

Run data reading.

python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

Output:

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "Vénus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadius": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alternativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion": 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch", "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Valétudo", "englishName": "Valetudo", "isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupiter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Obéron", "rel": "https://api.le-systeme-solaire.net/rest/bodies/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cordélia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"moon": "Ophélie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desdémone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": "https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-systeme-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bodies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon": "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457, "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 25362.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "perihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton", "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent": 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}

...

{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

Handle Dependencies#

  1. Create HttpSubStream class.
class SolarSubStream(HttpSubStream, SolarStream, ABC):
    # Skip invalid links
    raise_on_http_errors = False

    def __init__(self, parent: SolarStream, **kwargs):
        super().__init__(parent=parent, **kwargs)

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        yield response.json()
  1. Create detail interface class.
class DetailOfBodies(SolarSubStream):
    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return f'bodies/{stream_slice["id"]}'

    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
            for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
                for data in record["bodies"]:
                    yield {"id": data["id"]}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        if response.status_code == 200:
            yield response.json()
        else:
            self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
  1. Instantiate detail interface.
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: Replace the streams below with your own streams.

        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        # TODO remove the authenticator if not required.
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config),
                DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
  1. Declare response structure.
vi ./source_solar/schemas/detail_of_bodies.json
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "englishName": {
      "type": "string"
    },
    "isPlanet": {
      "type": "boolean"
    },
    "moons": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "moon": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    },
    "semimajorAxis": {
      "type": "number"
    },
    "perihelion": {
      "type": "number"
    },
    "aphelion": {
      "type": "number"
    },
    "eccentricity": {
      "type": "number"
    },
    "inclination": {
      "type": "number"
    },
    "mass": {
      "type": "object",
      "properties": {
        "massValue": {
          "type": "number"
        },
        "massExponent": {
          "type": "integer"
        }
      }
    },
    "vol": {
      "type": "object",
      "properties": {
        "volValue": {
          "type": "number"
        },
        "volExponent": {
          "type": "integer"
        }
      }
    },
    "density": {
      "type": "number"
    },
    "gravity": {
      "type": "number"
    },
    "escape": {
      "type": "number"
    },
    "meanRadius": {
      "type": "number"
    },
    "equaRadius": {
      "type": "number"
    },
    "polarRadius": {
      "type": "number"
    },
    "flattening": {
      "type": "number"
    },
    "dimension": {
      "type": "string"
    },
    "sideralOrbit": {
      "type": "number"
    },
    "sideralRotation": {
      "type": "number"
    },
    "aroundPlanet": {
      "type": "object",
      "properties": {
        "planet": {
          "type": "string"
        },
        "rel": {
          "type": "string"
        }
      }
    },
    "discoveredBy": {
      "type": "string"
    },
    "discoveryDate": {
      "type": "string"
    },
    "alternativeName": {
      "type": "string"
    },
    "axialTilt": {
      "type": "number"
    },
    "avgTemp": {
      "type": "number"
    },
    "mainAnomaly": {
      "type": "number"
    },
    "argPeriapsis": {
      "type": "number"
    },
    "longAscNode": {
      "type": "number"
    },
    "bodyType": {
      "type": "string"
    }
  }
}
  1. Update sync configuration in ./sample_files/configured_catalog.json.
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "detail_of_bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}
  1. Test data reading.
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

The final output indicates reading 284 records.

{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

Deploy Custom Connector#

  1. Create a connector Docker image.
docker build . -t airbyte/source-solar:dev
docker images

Display:

REPOSITORY                     TAG             IMAGE ID       CREATED          SIZE
airbyte/source-solar           dev             b42429012ce0   22 seconds ago   120MB
  1. Open the Airbyte page at http://localhost:8000/, and create a new connector.

image-20220626210721477

image-20220626211703757

After creation, it can be seen in the connector list.

image-20220626211844654

Use Custom Connector#

Create a new connection.

image-20220626212028444

image-20220626212107982

image-20220626212142644

Execute sync.

image-20221203202150388

Sync successful.

image-20221203202516832

View the database.

image-20221203202717347

Since we chose Normalized tabular data, the sync will automatically expand all our JSON objects, resulting in multiple tables.

image-20221203202815537

Among them, solar_bodies corresponds to the raw_data, and the bodies column is the original response data.

image-20221203203008198

Summary#

Thus, the entire introductory tutorial for Airbyte comes to an end. Compared to other ETL tools, Airbyte can perform the ETL process directly on clearly documented APIs, and combined with dbt, it can also achieve ELT processes, all while only requiring the writing of Python scripts, providing high flexibility. Although Airbyte is still in its infancy, the future looks promising.

References#

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.