Universql
Python logo

Python

Connect to Universql using the Python client library.

Python Integration

Universql works seamlessly with Snowflake's Python client libraries. You can use either the official Snowflake Connector for Python or SQLAlchemy.

Installation

# Install Snowflake Connector for Python
pip install snowflake-connector-python
 
# Or with SQLAlchemy support
pip install snowflake-connector-python[pandas]
pip install snowflake-sqlalchemy

Quick Start with Snowflake Connector

import snowflake.connector
 
# Create a connection
conn = snowflake.connector.connect(
    host='{universql_server}',  # Universql server
    user='your_username',
    password='your_password',
    warehouse='your_warehouse',
    database='your_database',
    schema='your_schema'
)
 
# Execute a query
with conn.cursor() as cursor:
    cursor.execute("""
        SELECT 
            DATE_TRUNC('month', created_at) as month,
            COUNT(*) as user_count,
            COUNT(DISTINCT country) as countries
        FROM users
        GROUP BY 1
        ORDER BY 1 DESC
        LIMIT 12
    """)
    results = cursor.fetchall()
    
print("Monthly user stats:", results)

Using with Pandas

import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
 
# Create a connection
conn = snowflake.connector.connect(
    host='{universql_server}',  # Universql server
    user='your_username',
    password='your_password',
    warehouse='your_warehouse',
    database='your_database',
    schema='your_schema'
)
 
# Query data into a DataFrame
df = pd.read_sql("""
    SELECT 
        product_id,
        SUM(quantity) as total_sold,
        AVG(price) as avg_price
    FROM sales
    GROUP BY product_id
    HAVING total_sold > 100
    ORDER BY total_sold DESC
""", conn)
 
# Write data back to Snowflake
new_data = pd.DataFrame({
    'product_id': [1, 2, 3],
    'name': ['Product A', 'Product B', 'Product C'],
    'price': [99.99, 149.99, 199.99]
})
 
success, nchunks, nrows, _ = write_pandas(
    conn=conn,
    df=new_data,
    table_name='PRODUCTS',
    database='your_database',
    schema='your_schema'
)

Using SQLAlchemy

from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
 
# Create engine
engine = create_engine(URL(
    host='{universql_server}',  # Universql server
    user='your_username',
    password='your_password',
    database='your_database',
    schema='your_schema',
    warehouse='your_warehouse'
))
 
# Query using SQLAlchemy
with engine.connect() as conn:
    result = conn.execute("""
        SELECT 
            category,
            COUNT(*) as product_count,
            AVG(price) as avg_price
        FROM products
        GROUP BY category
        HAVING product_count > 10
        ORDER BY avg_price DESC
    """).fetchall()
    
    for row in result:
        print(f"Category: {row.category}, Products: {row.product_count}, Avg Price: ${row.avg_price:.2f}")

Async Support

import asyncio
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
 
async def process_data(conn, batch_id):
    # Query data
    df = pd.read_sql(f"""
        SELECT *
        FROM raw_data
        WHERE batch_id = {batch_id}
    """, conn)
    
    # Process data
    df['processed'] = df['value'].apply(lambda x: x * 2)
    
    # Write back to Snowflake
    write_pandas(
        conn=conn,
        df=df,
        table_name='PROCESSED_DATA',
        database='your_database',
        schema='your_schema'
    )
 
async def main():
    conn = snowflake.connector.connect(
        host='{universql_server}',  # Universql server
        user='your_username',
        password='your_password',
        warehouse='your_warehouse',
        database='your_database',
        schema='your_schema'
    )
    
    # Process multiple batches concurrently
    tasks = [process_data(conn, i) for i in range(10)]
    await asyncio.gather(*tasks)
    
    conn.close()
 
asyncio.run(main())

Error Handling

from snowflake.connector.errors import ProgrammingError, DatabaseError
 
try:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM non_existent_table")
except ProgrammingError as e:
    print(f"SQL Error: {e.msg}")
    print(f"Error Code: {e.errno}")
    print(f"SQL State: {e.sqlstate}")
except DatabaseError as e:
    print(f"Database Error: {e}")
except Exception as e:
    print(f"Unexpected Error: {e}")

Best Practices

  1. Use Environment Variables

    import os
    from snowflake.connector import connect
     
    conn = connect(
        host='{universql_server}',  # Universql server
        user=os.getenv('SNOWFLAKE_USER'),
        password=os.getenv('SNOWFLAKE_PASSWORD')
    )
  2. Connection Pooling

    from snowflake.connector import PooledConnection
     
    with PooledConnection(
        host='{universql_server}',  # Universql server
        user='your_username',
        password='your_password',
        database='your_database',
        schema='your_schema',
        max_connections=4
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute('SELECT * FROM users')
  3. Batch Operations

    data = [
        ('Product A', 99.99),
        ('Product B', 149.99),
        ('Product C', 199.99)
    ]
     
    cursor.executemany(
        "INSERT INTO products (name, price) VALUES (%s, %s)",
        data
    )

Additional Resources