This package allows you to trigger syncs for Census.
Install the airflow-provider-census package from PyPI using your preferred way of installing python packages.
There are 2 ways to configure a Census connection depending on whether you are using Airflow 1.10 or Airflow 2.
The CensusHook
and CensusOperator
use the census_default
connection id as a default, although this is configurable if you would like to use your own connection id.
- Go to any sync at https://app.getcensus.com/syncs
- Click on the Sync Configuration tab.
- Next to API TRIGGER, click "Click to show"
- The url will be of the format https://bearer:secret-token:[email protected]/api/v1/syncs/0/trigger
- the secret token will be of the format "secret-token:arandomstring" in the url above, including the "secret-token:" part. Do not include the "@".
In the Airflow Connections UI, create a new connection:
- Conn ID: census_default
- Conn Type: HTTP
- Password: secret-token
In the Airflow Connections UI, create a new connection:
- Conn Id: census_default
- Conn Type: Census
- Census Secret Token: secret-token
CensusHook
is a class that inherits from HttpHook
and can be used to run http requests for Census.
You will most likely interact with the operator rather than the hook.
The hook can be imported by the following code:
from airflow_provider_census.hooks.census import CensusHook
CensusOperator
triggers a sync job in Census. The operator takes the following parameters:
- sync_id : Navigate to the sync and check the url for the sync id. For example https://app.getcensus.com/syncs/0/overview here, the sync_id would be 0.
- census_conn_id : The connection id to use. This is optional and defaults to 'census_default'.
The operator can be imported by the following code:
from airflow_provider_census.operators.census import CensusOperator
CensusSensor
polls a sync run in Census. The sensor takes the following parameters:
- sync_run_id : The sync run id you get back from the CensusOperator which triggers a new sync.
- census_conn_id : The connection id to use. This is optional and defaults to 'census_default'.
The sensor can be imported by the following code:
from airflow_provider_census.sensors.census import CensusSensor
The following example will run a Census sync once a day:
from airflow_provider_census.operators.census import CensusOperator
from airflow_provider_census.sensors.census import CensusSensor
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "airflow",
"start_date": days_ago(1)
}
dag = DAG('census', default_args = default_args)
sync = CensusOperator(sync_id = 27, dag = dag, task_id = 'sync')
sensor = CensusSensor(sync_run_id = "{{ ti.xcom_pull(task_ids = 'sync') }}", dag = dag, task_id = 'sensor')
sync >> sensor
Source code available on Github. Feedback and pull requests are greatly appreciated. Let us know if we can improve this.
👋 The folks at Census originally put this together. Have data? We'll sync your data warehouse with your CRM and the customer success apps critical to your team.
You can always contact us via [email protected] or in-app via the live chat in the bottom right corner.