Connect to Universql using the Python client library.
Universql works seamlessly with Snowflake's Python client libraries. You can use either the official Snowflake Connector for Python or SQLAlchemy.
# Install Snowflake Connector for Python
pip install snowflake-connector-python
# Or with SQLAlchemy support
pip install snowflake-connector-python[pandas]
pip install snowflake-sqlalchemy
import snowflake.connector
# Create a connection
conn = snowflake.connector.connect(
host='{universql_server}', # Universql server
user='your_username',
password='your_password',
warehouse='your_warehouse',
database='your_database',
schema='your_schema'
)
# Execute a query
with conn.cursor() as cursor:
cursor.execute("""
SELECT
DATE_TRUNC('month', created_at) as month,
COUNT(*) as user_count,
COUNT(DISTINCT country) as countries
FROM users
GROUP BY 1
ORDER BY 1 DESC
LIMIT 12
""")
results = cursor.fetchall()
print("Monthly user stats:", results)
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
# Create a connection
conn = snowflake.connector.connect(
host='{universql_server}', # Universql server
user='your_username',
password='your_password',
warehouse='your_warehouse',
database='your_database',
schema='your_schema'
)
# Query data into a DataFrame
df = pd.read_sql("""
SELECT
product_id,
SUM(quantity) as total_sold,
AVG(price) as avg_price
FROM sales
GROUP BY product_id
HAVING total_sold > 100
ORDER BY total_sold DESC
""", conn)
# Write data back to Snowflake
new_data = pd.DataFrame({
'product_id': [1, 2, 3],
'name': ['Product A', 'Product B', 'Product C'],
'price': [99.99, 149.99, 199.99]
})
success, nchunks, nrows, _ = write_pandas(
conn=conn,
df=new_data,
table_name='PRODUCTS',
database='your_database',
schema='your_schema'
)
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
# Create engine
engine = create_engine(URL(
host='{universql_server}', # Universql server
user='your_username',
password='your_password',
database='your_database',
schema='your_schema',
warehouse='your_warehouse'
))
# Query using SQLAlchemy
with engine.connect() as conn:
result = conn.execute("""
SELECT
category,
COUNT(*) as product_count,
AVG(price) as avg_price
FROM products
GROUP BY category
HAVING product_count > 10
ORDER BY avg_price DESC
""").fetchall()
for row in result:
print(f"Category: {row.category}, Products: {row.product_count}, Avg Price: ${row.avg_price:.2f}")
import asyncio
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
async def process_data(conn, batch_id):
# Query data
df = pd.read_sql(f"""
SELECT *
FROM raw_data
WHERE batch_id = {batch_id}
""", conn)
# Process data
df['processed'] = df['value'].apply(lambda x: x * 2)
# Write back to Snowflake
write_pandas(
conn=conn,
df=df,
table_name='PROCESSED_DATA',
database='your_database',
schema='your_schema'
)
async def main():
conn = snowflake.connector.connect(
host='{universql_server}', # Universql server
user='your_username',
password='your_password',
warehouse='your_warehouse',
database='your_database',
schema='your_schema'
)
# Process multiple batches concurrently
tasks = [process_data(conn, i) for i in range(10)]
await asyncio.gather(*tasks)
conn.close()
asyncio.run(main())
from snowflake.connector.errors import ProgrammingError, DatabaseError
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM non_existent_table")
except ProgrammingError as e:
print(f"SQL Error: {e.msg}")
print(f"Error Code: {e.errno}")
print(f"SQL State: {e.sqlstate}")
except DatabaseError as e:
print(f"Database Error: {e}")
except Exception as e:
print(f"Unexpected Error: {e}")
Use Environment Variables
import os
from snowflake.connector import connect
conn = connect(
host='{universql_server}', # Universql server
user=os.getenv('SNOWFLAKE_USER'),
password=os.getenv('SNOWFLAKE_PASSWORD')
)
Connection Pooling
from snowflake.connector import PooledConnection
with PooledConnection(
host='{universql_server}', # Universql server
user='your_username',
password='your_password',
database='your_database',
schema='your_schema',
max_connections=4
) as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM users')
Batch Operations
data = [
('Product A', 99.99),
('Product B', 149.99),
('Product C', 199.99)
]
cursor.executemany(
"INSERT INTO products (name, price) VALUES (%s, %s)",
data
)