Airbyte is one of the latest open-source ETL tools launched in July 2020. It differs from other ETL tools because it provides out-of-the-box connectors through a UI and API, allowing community developers to monitor and maintain the tool. Connectors run as Docker containers and can be built in any language of your choice. By providing modular components and optional subsets of features, Airbyte offers greater flexibility.
Quick Start#
Local Deployment of Airbyte#
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
After starting, access: http://localhost:8000
Add Data Source#
Add Destination Source#
First, start a PostgreSQL database locally using docker-compose.
-
mkdir ./data
-
Create docker-compose.yaml:
version: "3" services: postgres: image: postgres:14 container_name: postgres restart: always environment: POSTGRES_DB: postgres POSTGRES_USER: postgres POSTGRES_PASSWORD: 123456 ports: - 5432:5432 volumes: - ./data:/var/lib/postgresql/data
-
docker-compose up -d
to start.
Then configure the destination source in Airbyte.
Establish Connection#
Trigger Sync#
View Results#
Use Airbyte CDK to Develop Custom API Data Source#
Create Data Source Template#
cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
# Install NPM from https://www.npmjs.com/get-npm if you don't have it
./generate.sh
Select Python HTTP API Source
, enter the data source name, here using sola
.
Install Python Dependencies#
cd ../../connectors/source-sola
python -m venv .venv # Create a virtual environment in the .venv directory
source .venv/bin/activate # enable the venv
pip install -r requirements.txt
Execute Example#
python main.py spec
Outputting the following indicates successful project initialization:
{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}
Define Input Parameters#
The simplest way to achieve this is to create a spec.yaml
file in source_<name>/spec.yaml
that describes the connector's input according to the ConnectorSpecification schema. This is a good starting point when developing the source code. Use JsonSchema to define what the inputs are (e.g., username and password).
vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Solar System openData
type: object
required:
- page_size
additionalProperties: false
properties:
page_size:
type: integer
title: Page size
description: Page size
default: 10
minimum: 1
pattern: \d+
Connection Health Check#
This operation verifies whether the user-provided input configuration can be used to connect to the underlying data source. Note that this user-provided configuration contains the values described in spec.yaml
. To make a request to the API, we need to specify access permissions. In our case, this is a relatively simple check since the API does not require credentials. Edit ./source_solar/source.py
:
class SolarStream(HttpStream, ABC):
"""
TODO remove this comment
This class represents a stream output by the connector.
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
parsing responses etc..
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
contains the endpoints
- GET v1/customers
- GET v1/employees
then you should have three classes:
`class SolarStream(HttpStream, ABC)` which is the current class
`class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
`class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees
If some streams implement incremental sync, it is typical to create another class
`class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
is provided below.
See the reference docs for the full list of configurable options.
"""
# TODO: Fill in the url base. Required.
url_base = "https://api.le-systeme-solaire.net/rest/"
...
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
Create an input file and execute the check.
mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json
Displaying the following indicates that the connection check was successful:
{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
Declare Data Stream#
To declare a data stream for an interface, we only need the following steps:
- Add an HttpStream class in
./source-solar/source.py
.
class SolarStream(HttpStream, ABC):
"""
TODO remove this comment
This class represents a stream output by the connector.
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
parsing responses etc..
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
contains the endpoints
- GET v1/customers
- GET v1/employees
then you should have three classes:
`class SolarStream(HttpStream, ABC)` which is the current class
`class Customers(SolarStream)` contains behavior to pull data for customers using v1/customers
`class Employees(SolarStream)` contains behavior to pull data for employees using v1/employees
If some streams implement incremental sync, it is typical to create another class
`class IncrementalSolarStream((SolarStream), ABC)` then have concrete stream implementations extend it. An example
is provided below.
See the reference docs for the full list of configurable options.
"""
# TODO: Fill in the url base. Required.
url_base = "https://api.le-systeme-solaire.net/rest/"
page = 1
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__(**kwargs)
self.page_size = config["page_size"]
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.
This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
to most other methods in this class to help you form headers, request bodies, query params, etc..
For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
If there are no more pages in the result, return None.
"""
return None
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
Usually contains common params e.g. pagination size etc.
"""
return {}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
yield response.json()
class Bodies(SolarStream):
primary_key = 'id'
def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None) -> str:
return "bodies"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_data = response.json()
if len(response_data["bodies"]) == self.page_size:
self.page += 1
return {"page": f"{self.page}, {self.page_size}"}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
Usually contains common params e.g. pagination size etc.
"""
params = {
"order": "id,desc",
"page": f"{self.page}, {self.page_size}"
}
if next_page_token:
params.update(next_page_token)
return params
- Instantiate this class.
from airbyte_cdk.sources.streams.http.auth import NoAuth
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: Replace the streams below with your own streams.
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
# TODO remove the authenticator if not required.
auth = NoAuth()
return [Bodies(authenticator=auth, config=config)]
- Create
bodies.json
in./source-solar/schemas
.
{
"type": "object",
"required": [
"page_size"
],
"properties": {
"bodies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"englishName": {
"type": "string"
},
"isPlanet": {
"type": "boolean"
},
"moons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"moon": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
},
"semimajorAxis": {
"type": "number"
},
"perihelion": {
"type": "number"
},
"aphelion": {
"type": "number"
},
"eccentricity": {
"type": "number"
},
"inclination": {
"type": "number"
},
"mass": {
"type": "object",
"properties": {
"massValue": {
"type": "number"
},
"massExponent": {
"type": "integer"
}
}
},
"vol": {
"type": "object",
"properties": {
"volValue": {
"type": "number"
},
"volExponent": {
"type": "integer"
}
}
},
"density": {
"type": "number"
},
"gravity": {
"type": "number"
},
"escape": {
"type": "number"
},
"meanRadius": {
"type": "number"
},
"equaRadius": {
"type": "number"
},
"polarRadius": {
"type": "number"
},
"flattening": {
"type": "number"
},
"dimension": {
"type": "string"
},
"sideralOrbit": {
"type": "number"
},
"sideralRotation": {
"type": "number"
},
"aroundPlanet": {
"type": "object",
"properties": {
"planet": {
"type": "string"
},
"rel": {
"type": "string"
}
}
},
"discoveredBy": {
"type": "string"
},
"discoveryDate": {
"type": "string"
},
"alternativeName": {
"type": "string"
},
"axialTilt": {
"type": "number"
},
"avgTemp": {
"type": "number"
},
"mainAnomaly": {
"type": "number"
},
"argPeriapsis": {
"type": "number"
},
"longAscNode": {
"type": "number"
},
"bodyType": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
}
}
}
- Execute the check.
python main.py discover --config ./sample_files/secrets/config.json
The correct output is:
{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}
Read Data#
Create a read configuration.
vi ./sample_files/configured_catalog.json
{
"streams": [
{
"stream": {
"name": "bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Run data reading.
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json
Output:
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "Vénus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadius": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alternativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion": 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch", "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Valétudo", "englishName": "Valetudo", "isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupiter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Obéron", "rel": "https://api.le-systeme-solaire.net/rest/bodies/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cordélia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"moon": "Ophélie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desdémone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": "https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-systeme-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bodies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon": "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457, "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 25362.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "perihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton", "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent": 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}
...
{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}
Handle Dependencies#
- Create HttpSubStream class.
class SolarSubStream(HttpSubStream, SolarStream, ABC):
# Skip invalid links
raise_on_http_errors = False
def __init__(self, parent: SolarStream, **kwargs):
super().__init__(parent=parent, **kwargs)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
yield response.json()
- Create detail interface class.
class DetailOfBodies(SolarSubStream):
primary_key = 'id'
def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None) -> str:
return f'bodies/{stream_slice["id"]}'
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
for data in record["bodies"]:
yield {"id": data["id"]}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if response.status_code == 200:
yield response.json()
else:
self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
- Instantiate detail interface.
class SourceSolar(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
resp = requests.get(SolarStream.url_base)
status = resp.status_code
logger.info(f"Ping response code: {status}")
if status == 200:
return True, None
else:
message = resp.text
return False, message
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: Replace the streams below with your own streams.
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
# TODO remove the authenticator if not required.
auth = NoAuth()
return [Bodies(authenticator=auth, config=config),
DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
- Declare response structure.
vi ./source_solar/schemas/detail_of_bodies.json
{
"type": "object",
"required": [
"page_size"
],
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"englishName": {
"type": "string"
},
"isPlanet": {
"type": "boolean"
},
"moons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"moon": {
"type": "string"
},
"rel": {
"type": "string"
}
}
}
},
"semimajorAxis": {
"type": "number"
},
"perihelion": {
"type": "number"
},
"aphelion": {
"type": "number"
},
"eccentricity": {
"type": "number"
},
"inclination": {
"type": "number"
},
"mass": {
"type": "object",
"properties": {
"massValue": {
"type": "number"
},
"massExponent": {
"type": "integer"
}
}
},
"vol": {
"type": "object",
"properties": {
"volValue": {
"type": "number"
},
"volExponent": {
"type": "integer"
}
}
},
"density": {
"type": "number"
},
"gravity": {
"type": "number"
},
"escape": {
"type": "number"
},
"meanRadius": {
"type": "number"
},
"equaRadius": {
"type": "number"
},
"polarRadius": {
"type": "number"
},
"flattening": {
"type": "number"
},
"dimension": {
"type": "string"
},
"sideralOrbit": {
"type": "number"
},
"sideralRotation": {
"type": "number"
},
"aroundPlanet": {
"type": "object",
"properties": {
"planet": {
"type": "string"
},
"rel": {
"type": "string"
}
}
},
"discoveredBy": {
"type": "string"
},
"discoveryDate": {
"type": "string"
},
"alternativeName": {
"type": "string"
},
"axialTilt": {
"type": "number"
},
"avgTemp": {
"type": "number"
},
"mainAnomaly": {
"type": "number"
},
"argPeriapsis": {
"type": "number"
},
"longAscNode": {
"type": "number"
},
"bodyType": {
"type": "string"
}
}
}
- Update sync configuration in
./sample_files/configured_catalog.json
.
{
"streams": [
{
"stream": {
"name": "bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "detail_of_bodies",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
],
"source_defined_primary_key": [
[
"id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
- Test data reading.
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json
The final output indicates reading 284 records.
{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}
Deploy Custom Connector#
- Create a connector Docker image.
docker build . -t airbyte/source-solar:dev
docker images
Display:
REPOSITORY TAG IMAGE ID CREATED SIZE
airbyte/source-solar dev b42429012ce0 22 seconds ago 120MB
- Open the Airbyte page at http://localhost:8000/, and create a new connector.
After creation, it can be seen in the connector list.
Use Custom Connector#
Create a new connection.
Execute sync.
Sync successful.
View the database.
Since we chose Normalized tabular data, the sync will automatically expand all our JSON objects, resulting in multiple tables.
Among them, solar_bodies
corresponds to the raw_data
, and the bodies
column is the original response data.
Summary#
Thus, the entire introductory tutorial for Airbyte comes to an end. Compared to other ETL tools, Airbyte can perform the ETL process directly on clearly documented APIs, and combined with dbt, it can also achieve ELT processes, all while only requiring the writing of Python scripts, providing high flexibility. Although Airbyte is still in its infancy, the future looks promising.