Swiss Army Knife of Data Orchestrators

A simple and flexible data orchestrator that fits your bespoke data pipeline needs with a built-in asset lineage graph generator and a query language for probing

Contact Sales

Extremely dynamic and flexible, even when dealing with the most complex logic

  • Recipe
  • Document DAG
  • load_fact_file load_fact_file create_facts create_facts load_fact_file->create_facts load_dimensional_file load_dimensional_file load_dimensional_file->create_facts
    import re
    from datetime import datetime
    from tinyo import DES, TinyOConfig, Config, Execution, meta_task
    
    # tasks
    def load(context):
        print(f"Loading {context['deps']} -> {context['targs']} ...")
    def transform(context) -> str:
        print(f"Transforming {', '.join(context['deps'])} -> {context['targs']} ...")
    
    # generators
    @meta_task()
    def create_facts(event):
        if re.match(r'\d{8}', event):
            # do not generate the facts table on a Wednesday
            if datetime.strptime(event, '%Y%m%d').weekday() != 2:
                return (event, [DES(
                    deps=frozenset([f'staging.fact_table_{event}', 'dimensional_table']),
                    targs=frozenset([f'facts_with_{event}']),
                    ops=[(transform, Config({}))])])
        return (None, [])
    
    @meta_task(download=[create_facts], root=True)
    def load_fact_file(event):
        if re.match(r'.*fact_file_\d{8}.csv', event):
            date = event[-12:-4]
            return (date, [DES(
                deps=frozenset([event]),
                targs=frozenset([f'staging.fact_table_{date}']),
                ops=[(load, Config({}))])])
        return (None, [])
    
    @meta_task(download=[create_facts], root=True)
    def load_dimensional_file(event):
        if re.match(r'.*dimensional_file.csv', event):
            date = datetime.now().strftime('%Y%m%d')
            return (date, [DES(
                deps=frozenset([event]),
                targs=frozenset(['dimensional_table']),
                ops=[(load, Config({}))])])
        return (None, [])
    
    config = TinyOConfig(
        sql_conn='retail_dynamic_demo.db',
        dirs=['/tmp/data'],
        maxevents=30,
        max_workers=2,
        execution=Execution.PROCESSES,
    )
                
    STEP ONE

    Injest Recipe

    TinyOrc injests a recipe, which is itself a generator DAG that produces tasks and pipelines.

    In this example, the recipe is loading the daily fact and dimensional files to their respective tables

  • cluster__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 /tmp/data/fact_file_20241022.csv 2024-10-23 00:01:03.977612 cluster_dimensional_table__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612_staging_fact_table_20241022__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 cluster__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 /tmp/data/dimensional_file.csv 2024-10-23 00:01:10.653471 cluster_dimensional_table__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471_staging_fact_table_20241022__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 cluster__tmp_data_fact_file_20241023_csv_2024_10_23_00_01_41_172853 /tmp/data/fact_file_20241023.csv 2024-10-23 00:01:41.172853 cluster__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 /tmp/data/fact_file_20241024.csv 2024-10-23 00:01:47.840723 cluster_dimensional_table__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723_staging_fact_table_20241024__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 _tmp_data_fact_file_20241022_csv__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 /tmp/data/fact_file_20241022.csv staging_fact_table_20241022__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 staging.fact_table_20241022 _tmp_data_fact_file_20241022_csv__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612->staging_fact_table_20241022__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 load facts_with_20241022__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 facts_with_20241022 staging_fact_table_20241022__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612->facts_with_20241022__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 transform dimensional_table__tmp_data_fact_file_20241022_csv_2024_10_23_00_01_03_977612 dimensional_table _tmp_data_dimensional_file_csv__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 /tmp/data/dimensional_file.csv dimensional_table__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 dimensional_table _tmp_data_dimensional_file_csv__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471->dimensional_table__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 load staging_fact_table_20241022__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 staging.fact_table_20241022 facts_with_20241022__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 facts_with_20241022 staging_fact_table_20241022__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471->facts_with_20241022__tmp_data_dimensional_file_csv_2024_10_23_00_01_10_653471 transform _tmp_data_fact_file_20241023_csv__tmp_data_fact_file_20241023_csv_2024_10_23_00_01_41_172853 /tmp/data/fact_file_20241023.csv staging_fact_table_20241023__tmp_data_fact_file_20241023_csv_2024_10_23_00_01_41_172853 staging.fact_table_20241023 _tmp_data_fact_file_20241023_csv__tmp_data_fact_file_20241023_csv_2024_10_23_00_01_41_172853->staging_fact_table_20241023__tmp_data_fact_file_20241023_csv_2024_10_23_00_01_41_172853 load _tmp_data_fact_file_20241024_csv__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 /tmp/data/fact_file_20241024.csv staging_fact_table_20241024__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 staging.fact_table_20241024 _tmp_data_fact_file_20241024_csv__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723->staging_fact_table_20241024__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 load facts_with_20241024__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 facts_with_20241024 staging_fact_table_20241024__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723->facts_with_20241024__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 transform dimensional_table__tmp_data_fact_file_20241024_csv_2024_10_23_00_01_47_840723 dimensional_table
    STEP TWO

    Generate & Execute Tasks

    The generator produces the tasks and pipelines, which is represented by the document DAG. Once the upstream documents are successfully updated in the document DAG, the task is then executed.

    In this example, TinyOrc generates a transform task that adjoins the daily fact table into the combined facts table everyday except for Wednesdays.

More Examples

  • Print date or day of the week
  • Timer
  • Infinite task runs
  • Timer Events

    This example prints the date extracted from the timestamp of the timer event.

    import re
    from datetime import datetime, timedelta
    from tinyo import DES, TinyOConfig, Config, Execution, meta_task
    
    
    def print_date(context): print(datetime.fromtimestamp(float(context['config']['date'])))
    # generator @meta_task(root=True) def timer_gen(event): if re.match(r'.*ten_seconds_.*', event): date = event[12:] return (date, [DES( deps=frozenset([event]), targs=frozenset([f'print.{event}']), ops=[(print_date, Config({'date': date}))])]) return (None, []) config = TinyOConfig( sql_conn='timer.db', dirs=['/tmp/data'], maxevents=30, max_workers=2, timers=[{ 'name': 'ten_seconds', 'initial': datetime(2024, 10, 27, 0, 0, 0), 'interval': timedelta(seconds=10) }], execution=Execution.PROCESSES, )
    cluster_ten_seconds_1730120947_42052_2024_10_28_13_09_07_420520 ten_seconds_1730120947.42052 2024-10-28 13:09:07.420520 cluster_ten_seconds_1730120950_000429_2024_10_28_13_09_10_000429 ten_seconds_1730120950.000429 2024-10-28 13:09:10.000429 cluster_ten_seconds_1730120960_000535_2024_10_28_13_09_20_000535 ten_seconds_1730120960.000535 2024-10-28 13:09:20.000535 cluster_ten_seconds_1730120970_000427_2024_10_28_13_09_30_000427 ten_seconds_1730120970.000427 2024-10-28 13:09:30.000427 ten_seconds_1730120947_42052_ten_seconds_1730120947_42052_2024_10_28_13_09_07_420520 ten_seconds_1730120947.42052 print_ten_seconds_1730120947_42052_ten_seconds_1730120947_42052_2024_10_28_13_09_07_420520 print.ten_seconds_1730120947.42052 ten_seconds_1730120947_42052_ten_seconds_1730120947_42052_2024_10_28_13_09_07_420520->print_ten_seconds_1730120947_42052_ten_seconds_1730120947_42052_2024_10_28_13_09_07_420520 print_date ten_seconds_1730120950_000429_ten_seconds_1730120950_000429_2024_10_28_13_09_10_000429 ten_seconds_1730120950.000429 print_ten_seconds_1730120950_000429_ten_seconds_1730120950_000429_2024_10_28_13_09_10_000429 print.ten_seconds_1730120950.000429 ten_seconds_1730120950_000429_ten_seconds_1730120950_000429_2024_10_28_13_09_10_000429->print_ten_seconds_1730120950_000429_ten_seconds_1730120950_000429_2024_10_28_13_09_10_000429 print_date ten_seconds_1730120960_000535_ten_seconds_1730120960_000535_2024_10_28_13_09_20_000535 ten_seconds_1730120960.000535 print_ten_seconds_1730120960_000535_ten_seconds_1730120960_000535_2024_10_28_13_09_20_000535 print.ten_seconds_1730120960.000535 ten_seconds_1730120960_000535_ten_seconds_1730120960_000535_2024_10_28_13_09_20_000535->print_ten_seconds_1730120960_000535_ten_seconds_1730120960_000535_2024_10_28_13_09_20_000535 print_date ten_seconds_1730120970_000427_ten_seconds_1730120970_000427_2024_10_28_13_09_30_000427 ten_seconds_1730120970.000427 print_ten_seconds_1730120970_000427_ten_seconds_1730120970_000427_2024_10_28_13_09_30_000427 print.ten_seconds_1730120970.000427 ten_seconds_1730120970_000427_ten_seconds_1730120970_000427_2024_10_28_13_09_30_000427->print_ten_seconds_1730120970_000427_ten_seconds_1730120970_000427_2024_10_28_13_09_30_000427 print_date
  • Event-based System

    Receipt of file_k (where k is a single digit) triggers the op creating file_k+1, which in turn triggers the op again. This continues until file_10.

    import re, time
    from tinyo import DES, TinyOConfig, Config, Execution, meta_task
    
    
    def create_file(context): time.sleep(1) with open(context['config']['name'], 'w') as fb: fb.write(context['config']['name'])
    # generator @meta_task(root=True) def gen(event): if re.match(r'.*file_\d{1}$', event): count = int(event[event.find('_')+1:]) targ = f'/tmp/data/file_{count+1}' return (count, [DES( deps=frozenset([event]), targs=frozenset([targ]), ops=[(create_file, Config({'name': targ}))])]) return (None, []) config = TinyOConfig( sql_conn='infinite_task_runs.db', dirs=['/tmp/data'], maxevents=30, max_workers=2, execution=Execution.PROCESSES, )
    cluster__tmp_data_file_1_2024_11_02_04_27_38_266967 /tmp/data/file_1 2024-11-02 04:27:38.266967 cluster__tmp_data_file_2_2024_11_02_04_27_39_298972 /tmp/data/file_2 2024-11-02 04:27:39.298972 cluster__tmp_data_file_3_2024_11_02_04_27_40_330977 /tmp/data/file_3 2024-11-02 04:27:40.330977 cluster__tmp_data_file_4_2024_11_02_04_27_41_358982 /tmp/data/file_4 2024-11-02 04:27:41.358982 _tmp_data_file_1__tmp_data_file_1_2024_11_02_04_27_38_266967 /tmp/data/file_1 _tmp_data_file_2__tmp_data_file_1_2024_11_02_04_27_38_266967 /tmp/data/file_2 _tmp_data_file_1__tmp_data_file_1_2024_11_02_04_27_38_266967->_tmp_data_file_2__tmp_data_file_1_2024_11_02_04_27_38_266967 create_file _tmp_data_file_2__tmp_data_file_2_2024_11_02_04_27_39_298972 /tmp/data/file_2 _tmp_data_file_3__tmp_data_file_2_2024_11_02_04_27_39_298972 /tmp/data/file_3 _tmp_data_file_2__tmp_data_file_2_2024_11_02_04_27_39_298972->_tmp_data_file_3__tmp_data_file_2_2024_11_02_04_27_39_298972 create_file _tmp_data_file_3__tmp_data_file_3_2024_11_02_04_27_40_330977 /tmp/data/file_3 _tmp_data_file_4__tmp_data_file_3_2024_11_02_04_27_40_330977 /tmp/data/file_4 _tmp_data_file_3__tmp_data_file_3_2024_11_02_04_27_40_330977->_tmp_data_file_4__tmp_data_file_3_2024_11_02_04_27_40_330977 create_file _tmp_data_file_4__tmp_data_file_4_2024_11_02_04_27_41_358982 /tmp/data/file_4 _tmp_data_file_5__tmp_data_file_4_2024_11_02_04_27_41_358982 /tmp/data/file_5 _tmp_data_file_4__tmp_data_file_4_2024_11_02_04_27_41_358982->_tmp_data_file_5__tmp_data_file_4_2024_11_02_04_27_41_358982 create_file