-
Declarative Programming
A declarative programming paradigm inspired by Makefile
-
Dynamic DAGs
Generate snapshots of data pipeline activity based on any logic, any time
-
Event-based System
A push-based, event-driven system that monitors file and timer events
-
Multiprocess Execution
A built-in, multiprocess executor that can decouple from the orchestrator
Extremely dynamic and flexible, even when dealing with the most complex logic
- Recipe
- Document DAG
-
import re from datetime import datetime from tinyo import DES, TinyOConfig, Config, Execution, meta_task # tasks def load(context): print(f"Loading {context['deps']} -> {context['targs']} ...") def transform(context) -> str: print(f"Transforming {', '.join(context['deps'])} -> {context['targs']} ...") # generators @meta_task() def create_facts(event): if re.match(r'\d{8}', event): # do not generate the facts table on a Wednesday if datetime.strptime(event, '%Y%m%d').weekday() != 2: return (event, [DES( deps=frozenset([f'staging.fact_table_{event}', 'dimensional_table']), targs=frozenset([f'facts_with_{event}']), ops=[(transform, Config({}))])]) return (None, []) @meta_task(download=[create_facts], root=True) def load_fact_file(event): if re.match(r'.*fact_file_\d{8}.csv', event): date = event[-12:-4] return (date, [DES( deps=frozenset([event]), targs=frozenset([f'staging.fact_table_{date}']), ops=[(load, Config({}))])]) return (None, []) @meta_task(download=[create_facts], root=True) def load_dimensional_file(event): if re.match(r'.*dimensional_file.csv', event): date = datetime.now().strftime('%Y%m%d') return (date, [DES( deps=frozenset([event]), targs=frozenset(['dimensional_table']), ops=[(load, Config({}))])]) return (None, []) config = TinyOConfig( sql_conn='retail_dynamic_demo.db', dirs=['/tmp/data'], maxevents=30, max_workers=2, execution=Execution.PROCESSES, )
STEP ONEInjest Recipe
TinyOrc injests a recipe, which is itself a generator DAG that produces tasks and pipelines.
In this example, the recipe is loading the daily fact and dimensional files to their respective tables
-
STEP TWO
Generate & Execute Tasks
The generator produces the tasks and pipelines, which is represented by the document DAG. Once the upstream documents are successfully updated in the document DAG, the task is then executed.
In this example, TinyOrc generates a transform task that adjoins the daily fact table into the combined facts table everyday except for Wednesdays.
More Examples
- Print date or day of the week
- Timer
- Infinite task runs
-
Dynamic Data Pipelines
This example prints date to the orchestrator console if the date on the filename is a Monday, otherwise, it will print the day of the week.
import re from datetime import datetime from tinyo import DES, TinyOConfig, Config, Execution, meta_task
def file_date_monday(context): print(context['config']['date'])def file_date_otherdays(context): weekday = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] print(weekday[datetime.strptime(context['config']['date'], '%Y%m%d').weekday()])# generator @meta_task(root=True) def file_date_gen(event): if re.match(r'.*file_\d{8}.csv', event): date = event[-12:-4] if datetime.strptime(date, '%Y%m%d').weekday() == 0: return (date, [DES( deps=frozenset([event]), targs=frozenset([f'print.{date}']), ops=[(file_date_monday, Config({'date': date}))])]) return (date, [DES( deps=frozenset([event]), targs=frozenset([f'print.{date}']), ops=[(file_date_otherdays, Config({'date': date}))])]) return (None, []) config = TinyOConfig( sql_conn='print_date_or_day.db', dirs=['/tmp/data'], maxevents=30, max_workers=2, execution=Execution.PROCESSES, ) -
Timer Events
This example prints the date extracted from the timestamp of the timer event.
import re from datetime import datetime, timedelta from tinyo import DES, TinyOConfig, Config, Execution, meta_task
def print_date(context): print(datetime.fromtimestamp(float(context['config']['date'])))# generator @meta_task(root=True) def timer_gen(event): if re.match(r'.*ten_seconds_.*', event): date = event[12:] return (date, [DES( deps=frozenset([event]), targs=frozenset([f'print.{event}']), ops=[(print_date, Config({'date': date}))])]) return (None, []) config = TinyOConfig( sql_conn='timer.db', dirs=['/tmp/data'], maxevents=30, max_workers=2, timers=[{ 'name': 'ten_seconds', 'initial': datetime(2024, 10, 27, 0, 0, 0), 'interval': timedelta(seconds=10) }], execution=Execution.PROCESSES, ) -
Event-based System
Receipt of file_k (where k is a single digit) triggers the op creating file_k+1, which in turn triggers the op again. This continues until file_10.
import re, time from tinyo import DES, TinyOConfig, Config, Execution, meta_task
def create_file(context): time.sleep(1) with open(context['config']['name'], 'w') as fb: fb.write(context['config']['name'])# generator @meta_task(root=True) def gen(event): if re.match(r'.*file_\d{1}$', event): count = int(event[event.find('_')+1:]) targ = f'/tmp/data/file_{count+1}' return (count, [DES( deps=frozenset([event]), targs=frozenset([targ]), ops=[(create_file, Config({'name': targ}))])]) return (None, []) config = TinyOConfig( sql_conn='infinite_task_runs.db', dirs=['/tmp/data'], maxevents=30, max_workers=2, execution=Execution.PROCESSES, )