Python Connector ์ฌ์ฉํ๊ธฐยถ
์ด ํญ๋ชฉ์์๋ Snowflake ์ปค๋ฅํฐ๋ฅผ ์ฌ์ฉํ์ฌ ์ฌ์ฉ์ ๋ก๊ทธ์ธ, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ํ ์ด๋ธ ์์ฑ, ์จ์ดํ์ฐ์ค ์์ฑ, ๋ฐ์ดํฐ ์ฝ์ /๋ก๋ ๋ฐ ์ฟผ๋ฆฌ ๋ฑ์ ํ์ค Snowflake ์์ ์ ์ํํ๊ธฐ ์ํ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ฃผ๋ ์ผ๋ จ์ ์๋ฅผ ์ ๊ณตํฉ๋๋ค.
์ด ํญ๋ชฉ์ ๋ง์ง๋ง์ ์ ๊ณต๋๋ ์ํ ์ฝ๋๋ ์์๋ฅผ ์๋ํ๋ ๋จ์ผ Python ํ๋ก๊ทธ๋จ์ ํตํฉํฉ๋๋ค.
์ฐธ๊ณ
์ด์ Snowflake๋ SQL์ ์ฌ์ฉํ์ง ์๊ณ ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค, ์คํค๋ง, ํ ์ด๋ธ, ์์ , ์จ์ดํ์ฐ์ค๋ฅผ ํฌํจํ ํต์ฌ Snowflake ๋ฆฌ์์ค๋ฅผ ๊ด๋ฆฌํ ์ ์๋ ์ผ๊ธ Python API๋ฅผ ์ ๊ณตํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ Snowflake Python APIs: Python์ผ๋ก Snowflake ์ค๋ธ์ ํธ ๊ด๋ฆฌํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ด ํญ๋ชฉ์ ๋ด์ฉ:
๋ฐ์ดํฐ๋ฒ ์ด์ค, ์คํค๋ง ๋ฐ ์จ์ดํ์ฐ์ค ๋ง๋ค๊ธฐยถ
๋ก๊ทธ์ธํ ํ, ์์ง ์๋ ๊ฒฝ์ฐ CREATE DATABASE, CREATE SCHEMA ๋ฐ CREATE WAREHOUSE ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค, ์คํค๋ง ๋ฐ ์จ์ดํ์ฐ์ค๋ฅผ ์์ฑํฉ๋๋ค.
์๋ ์๋ tiny_warehouse ์จ์ดํ์ฐ์ค, testdb ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ testschema ์คํค๋ง๋ฅผ ์์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค. ์คํค๋ง๋ฅผ ์์ฑํ ๋๋ ๋ฐ๋์ ์คํค๋ง๋ฅผ ์์ฑํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ด๋ฆ์ ์ง์ ํ๊ฑฐ๋ ์คํค๋ง๋ฅผ ์์ฑํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ด๋ฏธ ์ฐ๊ฒฐ๋์ด ์์ด์ผ ํจ์ ์ ์ํ์ญ์์ค. ์๋ ์์์๋ USE DATABASE ๋ช
๋ น์ ์คํํ ํ CREATE SCHEMA ๋ช
๋ น์ ์คํํ์ฌ ์ฌ๋ฐ๋ฅธ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์คํค๋ง๊ฐ ์์ฑ๋๋๋ก ํฉ๋๋ค.
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg") conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
๋ฐ์ดํฐ๋ฒ ์ด์ค, ์คํค๋ง ๋ฐ ์จ์ดํ์ฐ์ค ์ฌ์ฉํ๊ธฐยถ
ํ ์ด๋ธ์ ์์ฑํ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์คํค๋ง๋ฅผ ์ง์ ํฉ๋๋ค. ๋ํ, DML ๋ฌธ ๋ฐ ์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ธฐ ์ํ ๋ฆฌ์์ค๋ฅผ ์ ๊ณตํ ์จ์ดํ์ฐ์ค๋ฅผ ์ง์ ํฉ๋๋ค.
์๋ฅผ ๋ค์ด, testdb ๋ฐ์ดํฐ๋ฒ ์ด์ค, testschema ์คํค๋ง ๋ฐ tiny_warehouse ์จ์ดํ์ฐ์ค(์ด์ ์ ์์ฑํ)๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด:
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
ํ ์ด๋ธ ์์ฑ ๋ฐ ๋ฐ์ดํฐ ์ฝ์ ํ๊ธฐยถ
CREATE TABLE ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ํ ์ด๋ธ์ ์์ฑํ๊ณ INSERT ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ํ ์ด๋ธ์ ์ฑ์๋๋ค.
์๋ฅผ ๋ค์ด, ์ด๋ฆ์ด testtable ์ธ ํ
์ด๋ธ์ ์์ฑํ๊ณ ์ด ํ
์ด๋ธ์ ํ์ 2๊ฐ ์ฝ์
ํฉ๋๋ค.
conn.cursor().execute( "CREATE OR REPLACE TABLE " "test_table(col1 integer, col2 string)") conn.cursor().execute( "INSERT INTO test_table(col1, col2) VALUES " + " (123, 'test string1'), " + " (456, 'test string2')")
๋ฐ์ดํฐ ๋ก๋ฉํ๊ธฐยถ
๊ฐ๋ณ INSERT ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ํ ์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ํ๋ ๋์ , ๋ด๋ถ ๋๋ ์ธ๋ถ ์์น์ ์คํ ์ด์ง๋ ํ์ผ์์ ๋ฐ์ดํฐ๋ฅผ ์ผ๊ด์ ์ผ๋ก ๋ก๋ํ ์ ์์ต๋๋ค.
๋ด๋ถ ์์น์์ ๋ฐ์ดํฐ ๋ณต์ฌํ๊ธฐยถ
ํธ์คํธ ์ปดํจํฐ์ ํ์ผ์์ ํ ์ด๋ธ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๋ ค๋ฉด, ์ฐ์ PUT ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ๋ด๋ถ ์์น์ ํ์ผ์ ์คํ ์ด์งํ ํ COPY INTO <ํ ์ด๋ธ> ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ํ ์ด๋ธ๋ก ๋ณต์ฌํฉ๋๋ค.
์:
# Putting Data con.cursor().execute("PUT file:///tmp/data/file* @%testtable") con.cursor().execute("COPY INTO testtable")์ฌ๊ธฐ์ CSV ๋ฐ์ดํฐ๋ Linux ๋๋ macOS ํ๊ฒฝ์ ๋ก์ปฌ ๋๋ ํฐ๋ฆฌ์ธ
/tmp/data์ ์ ์ฅ๋๋ฉฐ ์ด ๋๋ ํฐ๋ฆฌ์๋ ์ด๋ฆ์ดfile0,file1, โฆfile100์ธ ํ์ผ์ด ํฌํจ๋ฉ๋๋ค.
์ธ๋ถ ์์น์์ ๋ฐ์ดํฐ ๋ณต์ฌํ๊ธฐยถ
์ธ๋ถ ์์น(์ฆ, S3 ๋ฒํท)์ ์ด๋ฏธ ์คํ ์ด์ง๋ ํ์ผ์์ ํ ์ด๋ธ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๋ ค๋ฉด COPY INTO <ํ ์ด๋ธ> ๋ช ๋ น์ ์ฌ์ฉํฉ๋๋ค.
์:
# Copying Data con.cursor().execute(""" COPY INTO testtable FROM s3://<s3_bucket>/data/ STORAGE_INTEGRATION = myint FILE_FORMAT=(field_delimiter=',') """.format( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY))์ฌ๊ธฐ์
s3://<s3_๋ฒํท>/data/์ S3 ๋ฒํท์ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค.๋ฒํท์ ํ์ผ์๋
data์ ๋์ฌ๊ฐ ์ฌ์ฉ๋ฉ๋๋ค.๋ฒํท์ ๊ณ์ ๊ด๋ฆฌ์(์ฆ, ACCOUNTADMIN ์ญํ ์ ์ฌ์ฉ์) ๋๋ ์ ์ญ CREATE INTEGRATION ๊ถํ์ด ์๋ ์ญํ ์ด CREATE STORAGE INTEGRATION ์ ์ฌ์ฉํ์ฌ ์์ฑํ ์ ์ฅ์ ํตํฉ์ผ๋ก ์ก์ธ์ค๋ฉ๋๋ค. ์ ์ฅ์ ํตํฉ์ ์ฌ์ฉํ๋ฉด ์ฌ์ฉ์๋ ๊ฐ์ธ ์ ์ฅ์ ์์น์ ์ก์ธ์คํ๊ธฐ ์ํ ์๊ฒฉ ์ฆ๋ช ์ ์ ๋ ฅํ์ง ์์๋ ๋ฉ๋๋ค.
์ฐธ๊ณ
์ด ์์์๋ format() ํจ์๋ฅผ ์ฌ์ฉํ์ฌ ๋ฌธ์ ๊ตฌ์ฑํฉ๋๋ค. ํ๊ฒฝ์ SQL ์ฝ์ ๊ณต๊ฒฉ์ ์ํ์ด ์๋ ๊ฒฝ์ฐ์๋ format() ํจ์๋ฅผ ์ฌ์ฉํ๋ ๋์ ๊ฐ์ ๋ฐ์ธ๋ฉํ๋ ๊ฒ์ด ์ข์ ์ ์์ต๋๋ค.
๋ฐ์ดํฐ ์ฟผ๋ฆฌํ๊ธฐยถ
Python์ฉ Snowflake ์ปค๋ฅํฐ๋ฅผ ํตํด ๋ค์์ ์ ์ถํ ์ ์์ต๋๋ค.
๋๊ธฐ ์ฟผ๋ฆฌ, ์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋ ํ ์ ํ๋ฆฌ์ผ์ด์ ์ผ๋ก ์ ์ด๋ฅผ ๋ฐํํฉ๋๋ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ, ์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋๊ธฐ ์ ์ ํ๋ฆฌ์ผ์ด์ ์ผ๋ก ์ ์ด๋ฅผ ๋ฐํํฉ๋๋ค.
์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋ ํ, Cursor ์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ์ ๋ชจ๋ ๊ฐ์ ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก, Python์ฉ Snowflake ์ปค๋ฅํฐ๋ Snowflake ๋ฐ์ดํฐ ํ์
์์ ๋ค์ดํฐ๋ธ Python ๋ฐ์ดํฐ ํ์
์ผ๋ก ๊ฐ์ ๋ณํํฉ๋๋ค. (๊ฐ์ ๋ฌธ์์ด๋ก ๋ฐํํ๊ณ ์ ํ๋ฆฌ์ผ์ด์
์์ ํ์
์ ๋ณํํ๋๋ก ์ ํํ ์ ์์์ ์ ์ํ์ญ์์ค. ๋ฐ์ดํฐ ๋ณํ์ ์ฐํํ์ฌ ์ฟผ๋ฆฌ ์ฑ๋ฅ ํฅ์ํ๊ธฐ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.)
์ฐธ๊ณ
๊ธฐ๋ณธ์ ์ผ๋ก NUMBER ์ด์ ๊ฐ์ ๋ฐฐ์ ๋ฐ๋ ๋ถ๋ ์์์ ๊ฐ(float64)์ผ๋ก ๋ฐํ๋ฉ๋๋ค. fetch_pandas_all() ๋ฐ fetch_pandas_batches() ๋ฉ์๋์์ ์ด๋ฅผ 10์ง์ ๊ฐ(decimal.Decimal)์ผ๋ก ๋ฐํํ๋ ค๋ฉด connect() ๋ฉ์๋์ True ๋งค๊ฐ ๋ณ์๋ฅผ arrow_number_to_decimal ๋ก ์ค์ ํฉ๋๋ค.
๋๊ธฐ ์ฟผ๋ฆฌ ์ํํ๊ธฐยถ
๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ํํ๋ ค๋ฉด Cursor ์ค๋ธ์ ํธ์ execute() ๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค. ์:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
cursor ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ ๊ฐ์ ธ์ค๊ธฐ ์ ์ค๋ช
๊ณผ ๊ฐ์ด, Cursor ์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ์์ ๊ฐ์ ๊ฐ์ ธ์ต๋๋ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ ์ํํ๊ธฐยถ
Python์ฉ Snowflake ๋๋ผ์ด๋ฒ๋ ๋น๋๊ธฐ ์ฟผ๋ฆฌ(์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋๊ธฐ ์ ์ ์ฌ์ฉ์์๊ฒ ์ ์ด๋ฅผ ๋ฐํํ๋ ์ฟผ๋ฆฌ)๋ฅผ ์ง์ํฉ๋๋ค. ์ฌ์ฉ์๋ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ ์ถํ๊ณ ํด๋ง์ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ ์๋ฃ ์์ ์ ๊ฒฐ์ ํ ์ ์์ต๋๋ค. ์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋ ํ์๋ ๊ฒฐ๊ณผ๊ฐ ์ ๊ณต๋ฉ๋๋ค.
์ฐธ๊ณ
๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ํํ๋ ค๋ฉด ABORT_DETACHED_QUERY ๊ตฌ์ฑ ๋งค๊ฐ ๋ณ์๊ฐ FALSE (๊ธฐ๋ณธ๊ฐ)์ธ์ง ํ์ธํด์ผ ํฉ๋๋ค.
ํด๋ผ์ด์ธํธ์์ ์ฐ๊ฒฐ์ด ๋์ด์ง ๊ฒฝ์ฐ:
๋๊ธฐ์ ์ฟผ๋ฆฌ์ ๊ฒฝ์ฐ ์งํ ์ค์ธ ๋ชจ๋ ๋๊ธฐ์ ์ฟผ๋ฆฌ๋ ๋งค๊ฐ ๋ณ์ ๊ฐ์ ๊ด๊ณ์์ด ์ฆ์ ์ค๋จ๋ฉ๋๋ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ๊ฒฝ์ฐ:
ABORT_DETACHED_QUERY๊ฐ
FALSE๋ก ์ค์ ๋ ๊ฒฝ์ฐ ์งํ ์ค์ธ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๊ฐ ์ ์์ ์ผ๋ก ์ข ๋ฃ๋ ๋๊น์ง ๊ณ์ ์คํ๋ฉ๋๋ค.ABORT_DETACHED_QUERY๊ฐ
TRUE๋ก ์ค์ ๋ ๊ฒฝ์ฐ 5๋ถ ํ์๋ ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ์ด ๋ค์ ์ค์ ๋์ง ์์ผ๋ฉด Snowflake๋ ์งํ ์ค์ธ ๋ชจ๋ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์๋์ผ๋ก ์ค๋จํฉ๋๋ค.cursor.query_result(queryId)๋ฅผ ํธ์ถํ์ฌ 5๋ถ์ด ์ง๋๋ฉด ๋น๋๊ธฐ ์ฟผ๋ฆฌ๊ฐ ์ค๋จ๋๋ ๊ฒ์ ๋ฐฉ์งํ ์ ์์ต๋๋ค. ์ด ํธ์ถ์ ์ฟผ๋ฆฌ๊ฐ ์์ง ์คํ ์ค์ด๋ฏ๋ก ์ค์ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํ์ง๋ ์์ง๋ง ์ฟผ๋ฆฌ๊ฐ ์ทจ์๋๋ ๊ฒ์ ๋ฐฉ์งํฉ๋๋ค.query_resultํธ์ถ์ ๋๊ธฐ ์์ ์ด๋ฏ๋ก ํน์ ์ฌ์ฉ ์ฌ๋ก์ ์ ํฉํ ์๋ ์๊ณ ์ ํฉํ์ง ์์ ์๋ ์์ต๋๋ค.
์ด ๊ธฐ๋ฅ์ ์ฌ์ฉํ๋ฉด ๊ฐ ์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆด ํ์ ์์ด ์ฌ๋ฌ ์ฟผ๋ฆฌ๋ฅผ ๋ณ๋ ฌ๋ก ์ ์ถํ ์ ์์ต๋๋ค. ๋ํ, ๋์ผํ ์ธ์ ๋์ ๋๊ธฐ ๋ฐ ๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ์กฐํฉ์ ์คํํ ์๋ ์์ต๋๋ค.
์ฐธ๊ณ
๋จ์ผ ์ฟผ๋ฆฌ์์ ์ฌ๋ฌ ๊ฐ์ ๋ฌธ์ ์คํํ๋ ค๋ฉด ์ธ์ ์์ ์ ํจํ ์จ์ดํ์ฐ์ค๋ฅผ ์ฌ์ฉํ ์ ์์ด์ผ ํฉ๋๋ค.
๋ง์ง๋ง์ผ๋ก, ํ ์ฐ๊ฒฐ์์ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ ์ถํ๊ณ ๋ค๋ฅธ ์ฐ๊ฒฐ์์ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, ์ฌ์ฉ์๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ ์ฅ๊ธฐ ์คํ ์ฟผ๋ฆฌ๋ฅผ ์์ํ๊ณ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ข ๋ฃํ๋ฉฐ ๋์ค์ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋๋ก ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ค์ ์์ํ ์ ์์ต๋๋ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ ์ ์ถํ๊ธฐยถ
๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ ์ถํ๋ ค๋ฉด Cursor ์ค๋ธ์ ํธ์ execute_async() ๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค. ์:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
์ฟผ๋ฆฌ๋ฅผ ์ ์ถํ ํ:
์ฟผ๋ฆฌ๊ฐ ์คํ ์ค์ธ์ง ์ฌ๋ถ๋ฅผ ํ์ธํ๋ ค๋ฉด, ์ฟผ๋ฆฌ ์ํ ํ์ธํ๊ธฐ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํ๋ ค๋ฉด, ์ฟผ๋ฆฌ ID๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ ๊ฒ์ํ๊ธฐ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ํํ๋ ์๋ ๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ์ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ๋ชจ๋ฒ ์ฌ๋กยถ
๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ ์ถํ ๋ ๋ฐ๋ผ์ผ ํ๋ ๋ชจ๋ฒ ์ฌ๋ก๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
๋ค๋ฅธ ์ฟผ๋ฆฌ์ ์ข ์๋ ์ฟผ๋ฆฌ๋ฅผ ํ์ธํ ํ ์ฟผ๋ฆฌ๋ฅผ ๋ณ๋ ฌ๋ก ์คํํฉ๋๋ค. ์ผ๋ถ ์ฟผ๋ฆฌ๋ ์ํธ ์์กด์ ์ด๊ณ ์์๊ฐ ์ค์ํ๋ฏ๋ก, ๋ณ๋ ฌ ์คํ์ ์ ํฉํ์ง ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, INSERT ๋ฌธ์ ํด๋น CREATE TABLE ๋ฌธ์ด ์ข ๋ฃ๋ ๋๊น์ง ์์๋์ง ์์์ผ ํฉ๋๋ค.
์ฌ์ฉ ๊ฐ๋ฅํ ๋ฉ๋ชจ๋ฆฌ์ ๋ํด ๋๋ฌด ๋ง์ ์ฟผ๋ฆฌ๋ฅผ ์คํํ์ง ์์์ผ ํฉ๋๋ค. ์ฌ๋ฌ ์ฟผ๋ฆฌ๋ฅผ ๋ณ๋ ฌ๋ก ์คํํ๋ฉด ์ผ๋ฐ์ ์ผ๋ก ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ฆ๊ฐํ๊ฒ ๋ฉ๋๋ค. ํนํ, ๋ฉ๋ชจ๋ฆฌ์ 2๊ฐ ์ด์์ ๊ฒฐ๊ณผ ์ธํธ๊ฐ ๋ฉ๋ชจ๋ฆฌ์ ๋์์ ์ ์ฅ๋์ด ์๋ ๊ฒฝ์ฐ์๋ ์ฌ์ฉ๋์ด ํฌ๊ฒ ์ฆ๊ฐํฉ๋๋ค.
ํด๋ง ์ค์ ์ฟผ๋ฆฌ๊ฐ ์คํจํ๋ ๋๋ฌธ ๊ฒฝ์ฐ๋ฅผ ์ฒ๋ฆฌํด์ผ ํฉ๋๋ค.
ํธ๋์ญ์ ์ ์ด ๋ฌธ(BEGIN, COMMIT ๋ฐ ROLLBACK)์ ๋ค๋ฅธ ๋ฌธ๊ณผ ๋ณ๋ ฌ๋ก ์คํ๋์ง ์์์ผ ํฉ๋๋ค.
SQL ์์ฒด์ ORDER BY ์ ์ด ์๋ค ํด๋ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๊ฐ ๋ฐ๋์ ์ ๋ ฌ๋ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ์ง๋ ์๋๋ค๋ ์ ์ ์ ์ํ์ธ์. ๊ฒฐ๊ณผ์ ์ผ๋ก
result_scanํจ์๋ ์ ๋ ฌ๋ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฅํ์ง ์์ต๋๋ค.
Snowflake ์ฟผ๋ฆฌ ID ๊ฒ์ํ๊ธฐยถ
์ฟผ๋ฆฌ ID๋ฅผ ํตํด Snowflake์ ์ํด ์คํ๋๋ ๊ฐ ์ฟผ๋ฆฌ๋ฅผ ์๋ณํ ์ ์์ต๋๋ค. Python์ฉ Snowflake ์ปค๋ฅํฐ๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์คํํ๋ ๊ฒฝ์ฐ์๋ Cursor ์ค๋ธ์ ํธ์ sfqid ์์ฑ์ ํตํด ์ฟผ๋ฆฌ ID์ ์ก์ธ์คํ ์ ์์ต๋๋ค.
# Retrieving a Snowflake Query ID cur = con.cursor() cur.execute("SELECT * FROM testtable") print(cur.sfqid)
์ฟผ๋ฆฌ ID๋ฅผ ์ฌ์ฉํ์ฌ ์ํํ ์ ์๋ ์์ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
์น ์ธํฐํ์ด์ค์์ ์ฟผ๋ฆฌ์ ์ํ๋ฅผ ํ์ธํฉ๋๋ค.
Classic Console ์์ ์ฟผ๋ฆฌ ID๋ History
ํ์ด์ง์ ํ์๋ฉ๋๋ค. ๋ด์ญ ํ์ด์ง๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ ๋ชจ๋ํฐ๋งํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฟผ๋ฆฌ์ ์ํ๋ฅผ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ํ์ธํฉ๋๋ค(์: ๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ์๋ฃ ์ฌ๋ถ ํ์ธ).
์ฟผ๋ฆฌ ์ํ ํ์ธํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ ๋๋ ์ด์ ์ ์ ์ถํ ๋๊ธฐ ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํฉ๋๋ค.
์ฟผ๋ฆฌ ID๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ ๊ฒ์ํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์คํ ์ค์ธ ์ฟผ๋ฆฌ๋ฅผ ์ทจ์ํฉ๋๋ค.
์ฟผ๋ฆฌ ID๋ฅผ ๊ธฐ์ค์ผ๋ก ์ฟผ๋ฆฌ ์ทจ์ํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฟผ๋ฆฌ ์ํ ํ์ธํ๊ธฐยถ
์ฟผ๋ฆฌ์ ์ํ๋ฅผ ํ์ธํ๋ ค๋ฉด:
Cursor์ค๋ธ์ ํธ์sfqidํ๋์์ ์ฟผ๋ฆฌ ID๋ฅผ ๊ฐ์ ธ์ต๋๋ค.Connection์ค๋ธ์ ํธ์get_query_status()๋ฉ์๋๋ก ์ฟผ๋ฆฌ ID๋ฅผ ์ ๋ฌํ์ฌ ์ฟผ๋ฆฌ์ ์ํ๋ฅผ ๋ํ๋ด๋QueryStatus์ด๊ฑฐํ ์์๋ฅผ ๋ฐํํฉ๋๋ค.๊ธฐ๋ณธ์ ์ผ๋ก
get_query_status()์์๋ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๊ฐ ์ค๋ฅ์ธ ๊ฒฝ์ฐ ์ค๋ฅ๊ฐ ๋ฐ์๋์ง ์์ต๋๋ค. ์ค๋ฅ๊ฐ ๋ฐ์ํ๋๋ก ํ๋ ค๋ฉด, ๋์get_query_status_throw_if_error()๋ฉ์๋๋ฅผ ํธ์ถํด์ผ ํฉ๋๋ค.์ฟผ๋ฆฌ์ ์ํ๋ฅผ ํ์ธํ๋ ค๋ฉด
QueryStatus์ด๊ฑฐํ ์์๋ฅผ ์ฌ์ฉํฉ๋๋ค.์ฟผ๋ฆฌ๊ฐ ์์ง ์คํ ์ค์ธ์ง ํ์ธ(์: ๋น๋๊ธฐ ์ฟผ๋ฆฌ์ธ์ง ํ์ธ)ํ๋ ค๋ฉด
Connection์ค๋ธ์ ํธ์is_still_running()๋ฉ์๋๋ก ์ด ์์๋ฅผ ์ ๋ฌํฉ๋๋ค.์ค๋ฅ ๋ฐ์ ์ฌ๋ถ๋ฅผ ํ์ธํ๋ ค๋ฉด ์ด ์์๋ฅผ
is_an_error()๋ฉ์๋๋ก ์ ๋ฌํฉ๋๋ค.
์ด๊ฑฐํ ์์์ ์ ์ฒด ๋ชฉ๋ก์
QueryStatus๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ์์์๋ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์คํํ ํ ์ฟผ๋ฆฌ์ ์ํ๋ฅผ ํ์ธํฉ๋๋ค.
import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
time.sleep(1)
๋ค์ ์์์๋ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ์ ์ค๋ฅ๊ฐ ์๋ ๊ฒฝ์ฐ ์ค๋ฅ๊ฐ ๋ฐ์ํฉ๋๋ค.
from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
์ฟผ๋ฆฌ ID๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ ๊ฒ์ํ๊ธฐยถ
์ฐธ๊ณ
Cursor ์ค๋ธ์ ํธ์ ๋ํ execute() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์คํํ ๊ฒฝ์ฐ์๋ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํ๊ธฐ ์ํด ์ฟผ๋ฆฌ ID๋ฅผ ์ฌ์ฉํ ํ์๊ฐ ์์ต๋๋ค. cursor ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ ๊ฐ์ ธ์ค๊ธฐ ์์์ ์ค๋ช
๊ณผ ๊ฐ์ด, ๊ฒฐ๊ณผ์์ ๊ฐ์ ๊ฐ์ ธ์ค๊ธฐ๋ง ํ๋ฉด ๋ฉ๋๋ค.
๋น๋๊ธฐ ์ฟผ๋ฆฌ ๋๋ ์ด์ ์ ์ ์ถํ ๋๊ธฐ ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํ๋ ค๋ฉด, ๋ค์ ๋จ๊ณ๋ฅผ ๋ฐ๋ฅด์ญ์์ค.
์ฟผ๋ฆฌ์ ์ฟผ๋ฆฌ ID๋ฅผ ๊ฐ์ ธ์ต๋๋ค. Snowflake ์ฟผ๋ฆฌ ID ๊ฒ์ํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Cursor์ค๋ธ์ ํธ์get_results_from_sfqid()๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํฉ๋๋ค.cursor ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ ๊ฐ์ ธ์ค๊ธฐ ์ ์ค๋ช ๊ณผ ๊ฐ์ด,
Cursor์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ์์ ๊ฐ์ ๊ฐ์ ธ์ต๋๋ค.
์ฟผ๋ฆฌ๊ฐ ์์ง ์คํ ์ค์ธ ๊ฒฝ์ฐ, ํ์น ๋ฉ์๋(fetchone(), fetchmany(), fetchall() ๋ฑ)๋ ์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋ ๋๊น์ง ๋๊ธฐํฉ๋๋ค.
์:
# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
cursor ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ ๊ฐ์ ธ์ค๊ธฐยถ
์ปค์ ์ค๋ธ์ ํธ ๋ฐ๋ณต๊ธฐ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ํ ์ด๋ธ์์ ๊ฐ์ ๊ฐ์ ธ์ต๋๋ค.
์๋ฅผ ๋ค์ด, ํ
์ด๋ธ ์์ฑ ๋ฐ ๋ฐ์ดํฐ ์ฝ์
ํ๊ธฐ ์์ ์ด์ ์ ์์ฑํ testtable ํ
์ด๋ธ์์ โcol1โ ๋ฐ โcol2โ ์ด์ ๊ฐ์ ธ์ค๋ ค๋ฉด ๋ค์๊ณผ ์ ์ฌํ ์ฝ๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
cur = conn.cursor() try: cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1") for (col1, col2) in cur: print('{0}, {1}'.format(col1, col2)) finally: cur.close()
๋๋ Python์ฉ Snowflake ์ปค๋ฅํฐ๊ฐ ํธ๋ฆฌํ ๋ฐ๋ก ๊ฐ๊ธฐ๋ฅผ ์ ๊ณตํฉ๋๋ค.
for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"): print('{0}, {1}'.format(col1, col2))
๋จ์ผ ๊ฒฐ๊ณผ(์ฆ, ๋จ์ผ ํ)๋ฅผ ๊ฐ์ ธ์ค๋ ค๋ฉด fetchone ๋ฉ์๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone() print('{0}, {1}'.format(col1, col2))
์ง์ ๋ ํ์ ๊ฐ์๋ฅผ ํ ๋ฒ์ ๊ฐ์ ธ์ค๋ ค๋ฉด ํ์ ๊ฐ์์ ํจ๊ป fetchmany ๋ฉ์๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
cur = con.cursor().execute("SELECT col1, col2 FROM testtable") ret = cur.fetchmany(3) print(ret) while len(ret) > 0: ret = cur.fetchmany(3) print(ret)์ฐธ๊ณ
๊ฒฐ๊ณผ ์ธํธ๊ฐ ๋๋ฌด ์ปค ๋ฉ๋ชจ๋ฆฌ์ ์ ํฉํ์ง ์์ ๊ฒฝ์ฐ์๋
fetchone๋๋fetchmany๋ฅผ ์ฌ์ฉํฉ๋๋ค.
๋ชจ๋ ๊ฒฐ๊ณผ๋ฅผ ํ ๋ฒ์ ๊ฐ์ ธ์ค๋ ค๋ฉด:
results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall() for rec in results: print('%s, %s' % (rec[0], rec[1]))
์ฟผ๋ฆฌ์ ๋ํ ์๊ฐ ์ด๊ณผ๋ฅผ ์ค์ ํ๋ ค๋ฉด โbeginโ ๋ช ๋ น์ ์คํํ๊ณ ์ฟผ๋ฆฌ์ ์๊ฐ ์ด๊ณผ ๋งค๊ฐ ๋ณ์๋ฅผ ํฌํจํฉ๋๋ค. ์ฟผ๋ฆฌ๊ฐ ๋งค๊ฐ ๋ณ์ ๊ฐ์ ๊ธธ์ด๋ฅผ ์ด๊ณผํ๋ฉด ์ค๋ฅ๊ฐ ๋ฐ์ํ๊ณ ๋กค๋ฐฑ์ด ์ํ๋ฉ๋๋ค.
๋ค์ ์ฝ๋์์ 604 ์ค๋ฅ๋ ์ฟผ๋ฆฌ๊ฐ ์ทจ์๋์์์ ์๋ฏธํฉ๋๋ค. ์๊ฐ ์ด๊ณผ ๋งค๊ฐ ๋ณ์๋ฅผ ํตํด Timer() ๊ฐ ์์๋๊ณ ์ฟผ๋ฆฌ๊ฐ ์ง์ ๋ ์๊ฐ ๋ด์ ์๋ฃ๋์ง ์์ผ๋ฉด ์ทจ์๋ฉ๋๋ค.
conn.cursor().execute("create or replace table testtbl(a int, b string)") conn.cursor().execute("begin") try: conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query except ProgrammingError as e: if e.errno == 604: print("timeout") conn.cursor().execute("rollback") else: raise e else: conn.cursor().execute("commit")
DictCursor ๋ฅผ ์ฌ์ฉํ์ฌ ์ด ์ด๋ฆ์ ๊ธฐ์ค์ผ๋ก ๊ฐ ๊ฐ์ ธ์ค๊ธฐยถ
์ด ์ด๋ฆ์ ๊ธฐ์ค์ผ๋ก ๊ฐ์ ๊ฐ์ ธ์ค๋ ค๋ฉด, DictCursor ํ์
์ cursor ์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
์:
# Querying data by DictCursor from snowflake.connector import DictCursor cur = con.cursor(DictCursor) try: cur.execute("SELECT col1, col2 FROM testtable") for rec in cur: print('{0}, {1}'.format(rec['COL1'], rec['COL2'])) finally: cur.close()
๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ์ยถ
๋น๋๊ธฐ ์ฟผ๋ฆฌ์ ๊ฐ๋จํ ์๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
๋ค์ ์์์๋ 1๊ฐ์ ์ฐ๊ฒฐ์์ ๋น๋๊ธฐ ์ฟผ๋ฆฌ๋ฅผ ์ ์ถํ๊ณ ๋ค๋ฅธ ์ฐ๊ฒฐ์์ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํฉ๋๋ค.
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Get the query ID for the asynchronous query.
query_id = cur.sfqid
# Close the cursor and the connection.
cur.close()
conn.close()
# Open a new connection.
new_conn = snowflake.connector.connect( ... )
# Create a new cursor.
new_cur = new_conn.cursor()
# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
์ฟผ๋ฆฌ ID๋ฅผ ๊ธฐ์ค์ผ๋ก ์ฟผ๋ฆฌ ์ทจ์ํ๊ธฐยถ
์ฟผ๋ฆฌ ID ๋ฅผ ๊ธฐ์ค์ผ๋ก ์ฟผ๋ฆฌ๋ฅผ ์ทจ์ํฉ๋๋ค.
cur = cn.cursor() try: cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')") result = cur.fetchall() print(len(result)) print(result[0]) finally: cur.close()
โqueryIDโ ๋ฌธ์์ด์ ์ค์ ์ฟผ๋ฆฌ ID๋ก ๋ฐ๊ฟ๋๋ค. ์ฟผ๋ฆฌ์ ID๋ฅผ ๊ฐ์ ธ์ค๋ ค๋ฉด Snowflake ์ฟผ๋ฆฌ ID ๊ฒ์ํ๊ธฐ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
๋ฐ์ดํฐ ๋ณํ์ ์ฐํํ์ฌ ์ฟผ๋ฆฌ ์ฑ๋ฅ ํฅ์ํ๊ธฐยถ
์ฟผ๋ฆฌ ์ฑ๋ฅ์ ํฅ์ํ๋ ค๋ฉด, snowflake.connector.converter_null ๋ชจ๋์ SnowflakeNoConverterToPython ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ Snowflake ๋ด๋ถ ๋ฐ์ดํฐ ํ์
์์ ๋ค์ดํฐ๋ธ Python ๋ฐ์ดํฐ ํ์
์ผ๋ก์ ๋ฐ์ดํฐ ๋ณํ์ ์ฐํํฉ๋๋ค. ์:
from snowflake.connector.converter_null import SnowflakeNoConverterToPython con = snowflake.connector.connect( ... converter_class=SnowflakeNoConverterToPython ) for rec in con.cursor().execute("SELECT * FROM large_table"): # rec includes raw Snowflake data
๊ฒฐ๊ณผ์ ์ผ๋ก, ๋ชจ๋ ๋ฐ์ดํฐ๋ ๋ฌธ์์ด ํ์์ผ๋ก ํ์๋์ด ์ ํ๋ฆฌ์ผ์ด์
์ด ๋ค์ดํฐ๋ธ Python ๋ฐ์ดํฐ ํ์
์ผ๋ก ๋ณํ์ ์ํํฉ๋๋ค. ์๋ฅผ ๋ค์ด, TIMESTAMP_NTZ ๋ฐ TIMESTAMP_LTZ ๋ฐ์ดํฐ๋ ๋ฌธ์์ด ํ์์ผ๋ก ํ์๋๋ Epoch ์๊ฐ์ด๋ฉฐ TIMESTAMP_TZ ๋ฐ์ดํฐ๋ Epoch ์๊ฐ ๋ค์์ ๊ณต๋ฐฑ์ด ์ค๊ณ ๊ทธ ๋ค์์ UTC์ ๋ํ ์คํ์
(๋ถ)์ด ํ์๋๋ ๋ฌธ์์ด ํ์์
๋๋ค.
๋ฐ์ธ๋ฉ ๋ฐ์ดํฐ์๋ ์ํฅ์ ์ฃผ์ง ์์ผ๋ฉฐ, ์ฌ์ ํ Python ๋ค์ดํฐ๋ธ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํ์ฌ ์ ๋ฐ์ดํธ์์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ฐ์ดํฐ ๋ค์ด๋ก๋ํ๊ธฐยถ
Snowflake Connector for Python ๋ฒ์ 3.14.0๋ GET ๋ช
๋ น์ผ๋ก Snowflake ์คํ
์ด์ง์ ํ์ผ์ ๋ค์ด๋ก๋ํ ๋ ์ปค๋ฅํฐ๊ฐ ํ์ผ ๊ถํ์ ์ค์ ํ๋ ๋ฐฉ๋ฒ์ ์ง์ ํ๋ unsafe_file_write ์ฐ๊ฒฐ ๋งค๊ฐ ๋ณ์๋ฅผ ๋์
ํ์ต๋๋ค. ์ด๋ฌํ ํ์ผ์ ํญ์ Python ํ๋ก์ธ์ค๋ฅผ ์คํํ๋ ๋์ผํ ์ฌ์ฉ์๊ฐ ์์ ํฉ๋๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก unsafe_file_write ๋งค๊ฐ ๋ณ์๋ False`๋ก ์ค์ ๋์ด ๋์ฑ ์์ ํ๊ณ ์๊ฒฉํ codenowrap:`600 ํ์ผ ๊ถํ์ ์ ๊ณตํฉ๋๋ค. ์ฆ, ์์ ์์๊ฒ๋ง ๋ค์ด๋ก๋ํ ํ์ผ์ ์ฝ๊ธฐ/์ฐ๊ธฐ ๊ถํ์ด ์์ต๋๋ค. ๋ค๋ฅธ ๊ทธ๋ฃน๊ณผ ์ฌ์ฉ์์๊ฒ๋ GET ๋ช
๋ น์ผ๋ก ๋ค์ด๋ก๋ํ ํ์ผ์ ๋ํ ๊ถํ์ด ์์ต๋๋ค.
์กฐ์ง์์ ํ์ผ์ ๋ํด ๋ ์ ํ์ ์ธ ํ์ผ ๊ถํ์ด ํ์ํ ๊ฒฝ์ฐ unsafe_file_write ๋งค๊ฐ ๋ณ์๋ฅผ True ๋ก ์ค์ ํ ์ ์์ต๋๋ค. ์ด ๋งค๊ฐ ๋ณ์๋ฅผ ํ์ฑํํ๋ฉด ์คํ
์ด์ง์์ ๋ค์ด๋ก๋ํ ํ์ผ์ ๋ํ ํ์ผ ๊ถํ์ด 644 ๋ก ์ค์ ๋์ด ์์ ์๋ ํ์ผ์ ์ฝ๊ณ ์ธ ์ ์์ง๋ง ๋ค๋ฅธ ์ฌ๋์ ์ฝ๊ธฐ๋ง ํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด ๋ค์ด๋ก๋ํ ํ์ผ์ ์ฝ๊ณ ์ฒ๋ฆฌํ ์ ์์ด์ผ ํ๋ ๋ค๋ฅธ ์์คํ
์ฌ์ฉ์ ์๋์์ ์คํ๋๋ ์ผ๋ถ ETL ๋๊ตฌ์ ๊ฒฝ์ฐ ์ด ์ค์ ์ด ํ์ํ ์ ์์ต๋๋ค.
์ด๋ค ๊ฐ์ ์ฌ์ฉํด์ผ ํ ์ง ์ ๋ชจ๋ฅด๊ฒ ๋ค๋ฉด ์กฐ์ง์ ํด๋น ์ ํ๋ฆฌ์ผ์ด์ ๋ณด์ ์ ์ฑ ์ ๋ด๋นํ๋ ํ์ ๋ฌธ์ํ์ญ์์ค.
๋ฐ์ดํฐ ๋ฐ์ธ๋ฉํ๊ธฐยถ
SQL ๋ฌธ์์ ์ฌ์ฉ๋๋ ๊ฐ์ ์ง์ ํ๋ ค๋ฉด, ๋ฌธ์ ๋ฆฌํฐ๋ด์ ํฌํจํ๊ฑฐ๋ ๋ณ์๋ฅผ ๋ฐ์ธ๋ฉ ํ ์ ์์ต๋๋ค. ๋ณ์๋ฅผ ๋ฐ์ธ๋ฉํ ๋ SQL ๋ฌธ์ ํ ์คํธ์ ์๋ฆฌ ํ์์๋ฅผ 1๊ฐ ์ด์ ์ถ๊ฐํ๊ณ ๊ฐ ์๋ฆฌ ํ์์์ ๋ณ์(์ฌ์ฉํ ๋ณ์)๋ฅผ ์ง์ ํฉ๋๋ค.
๋ค์ ์๋ ๋ฆฌํฐ๋ด๊ณผ ๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ ๋๋ฅผ ๋น๊ตํ์ฌ ๋ณด์ฌ์ค๋๋ค.
๋ฆฌํฐ๋ด:
con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")๋ฐ์ธ๋ฉ:
con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
์ฐธ๊ณ
๋ฐ์ธ๋ฉํ ์ ์๊ฑฐ๋ ์ผ๊ด ๊ฒฐํฉํ ์ ์๋ ๋ฐ์ดํฐ ํฌ๊ธฐ์ ์ํ์๋ ์ ํ์ด ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ ์ฟผ๋ฆฌ ํ ์คํธ ํฌ๊ธฐ ์ ํ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Snowflake์์ ์ง์๋๋ ๋ฐ์ธ๋ฉ์ ํ์ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
pyformat๋ฐformat, ํด๋ผ์ด์ธํธ์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํฉ๋๋ค.qmark๋ฐnumeric, ์๋ฒ์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํฉ๋๋ค.
์ด์ ๊ด๋ จํ ๊ฐ๊ฐ์ ์ค๋ช ์ ์๋์์ ์ ๊ณต๋ฉ๋๋ค.
pyformat ๋๋ format ๋ฐ์ธ๋ฉยถ
pyformat ๋ฐ์ธ๋ฉ ๋ฐ format ๋ฐ์ธ๋ฉ ๋ชจ๋ ์๋ฒ์ธก์ด ์๋ ํด๋ผ์ด์ธํธ์ธก์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํฉ๋๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก, Python์ฉ Snowflake ์ปค๋ฅํฐ๋ pyformat ๋ฐ format ๋ชจ๋๋ฅผ ์ง์ํ๋ฏ๋ก, ์ฌ์ฉ์๋ %(name)s ๋๋ %s ๋ฅผ ์๋ฆฌ ํ์์๋ก ์ฌ์ฉํ ์ ์์ต๋๋ค. ์:
%(name)s๋ฅผ ์๋ฆฌ ํ์์๋ก ์ฌ์ฉ:conn.cursor().execute( "INSERT INTO test_table(col1, col2) " "VALUES(%(col1)s, %(col2)s)", { 'col1': 789, 'col2': 'test string3', })
%s๋ฅผ ์๋ฆฌ ํ์์๋ก ์ฌ์ฉ:con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
pyformat ๋ฐ format ์ ์ฌ์ฉํ๋ฉด ๋ชฉ๋ก ์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ IN ์ฐ์ฐ์๋ฅผ ์ํด ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํ ์ ์์ต๋๋ค.
# Binding data for IN operator con.cursor().execute( "SELECT col1, col2 FROM testtable" " WHERE col2 IN (%s)", ( ['test string1', 'test string3'], ))
ํผ์ผํธ ๋ฌธ์(โ%โ)๋ SQL LIKE์ฉ ์์ผ๋์นด๋ ๋ฌธ์ ๋ฐ Python์ฉ ํ์ ๋ฐ์ธ๋ฉ ๋ฌธ์๋ก ์ฌ์ฉํ ์ ์์ต๋๋ค. ํ์ ๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ๊ณ SQL ๋ช ๋ น์ ํผ์ผํธ ๋ฌธ์๊ฐ ํฌํจ๋ ๊ฒฝ์ฐ์๋ ํผ์ผํธ ๋ฌธ์๋ฅผ ์ด์ค์ผ์ดํํด์ผ ํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, SQL ๋ฌธ์ด ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ์ฐ:
SELECT col1, col2 FROM test_table WHERE col2 ILIKE '%York' LIMIT 1; -- Find York, New York, etc.
Python ์ฝ๋๋ ๋ค์๊ณผ ๊ฐ์์ผ ํฉ๋๋ค(์๋ณธ ํผ์ผํธ ๊ธฐํธ๋ฅผ ์ด์ค์ผ์ดํํ๋ ค๋ฉด ์ถ๊ฐ ํผ์ผํธ ๊ธฐํธ์ ์ ์).
sql_command = "select col1, col2 from test_table " sql_command += " where col2 like '%%York' limit %(lim)s" parameter_dictionary = {'lim': 1 } cur.execute(sql_command, parameter_dictionary)
qmark ๋๋ numeric ๋ฐ์ธ๋ฉยถ
qmark ๋ฐ์ธ๋ฉ ๋ฐ numeric ๋ฐ์ธ๋ฉ์ ํด๋ผ์ด์ธํธ์ธก์ด ์๋ ์๋ฒ์ธก์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํฉ๋๋ค.
qmark๋ฐ์ธ๋ฉ์ ๊ฒฝ์ฐ, ๋ฌผ์ํ ๋ฌธ์(?)๋ฅผ ์ฌ์ฉํ์ฌ ๋ฌธ์์ด์์ ๋ณ์ ๊ฐ์ ์ฝ์ ํ ์์น๋ฅผ ๋ํ๋ ๋๋ค.numeric๋ฐ์ธ๋ฉ์ ๊ฒฝ์ฐ, ์ฝ๋ก (:) ๋ค์ ์ซ์๋ฅผ ์ฌ์ฉํ์ฌ ํด๋น ์์น์ ๋์ฒด๋ ๋ณ์์ ์์น๋ฅผ ๋ํ๋ ๋๋ค. ์๋ฅผ ๋ค์ด,:2๋ ๋ ๋ฒ์งธ ๋ณ์๋ฅผ ์ง์ ํฉ๋๋ค.์ซ์ ๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ์ฌ ๋์ผํ ์ฟผ๋ฆฌ์์ ๋์ผํ ๊ฐ์ ๋ ๋ฒ ์ด์ ๋ฐ์ธ๋ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ๋ ๋ฒ ์ด์ ์ฌ์ฉํ Long VARCHAR ๋๋ BINARY ๋๋ ๋ฐ์ ํ ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
numeric๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ๋ฉด ์๋ฒ์ ๊ฐ์ ํ ๋ฒ ์ ์กํ๊ณ ์ฌ๋ฌ ๋ฒ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ค์ ์น์
์์๋ qmark ๋ฐ numeric ๋ฐ์ธ๋ฉ์ ์ฌ์ฉ ๋ฐฉ๋ฒ์ ์ค๋ช
ํฉ๋๋ค.
qmark ๋๋ numeric ๋ฐ์ธ๋ฉ ์ฌ์ฉํ๊ธฐยถ
qmark ๋๋ numeric ์คํ์ผ ๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ๋ ค๋ฉด ๋ค์ ์ค ํ๋๋ฅผ ์คํํ๊ฑฐ๋ connect() ๋ฅผ ํธ์ถํ ๋ ์ฐ๊ฒฐ ๋งค๊ฐ ๋ณ์์ ์ผ๋ถ๋ก paramstyle ์ ์ค์ ํ๋ฉด ๋ฉ๋๋ค.
snowflake.connector.paramstyle='qmark'snowflake.connector.paramstyle='numeric'
paramstyle ์ qmark ๋๋ numeric ์ ์ค์ ํ ๊ฒฝ์ฐ์๋, ? ๋๋ :N (์ฌ๊ธฐ์ N ์ ์ซ์๋ก ๋์ฒด)์ ๊ฐ๊ฐ ์๋ฆฌ ํ์์๋ก ์ฌ์ฉํด์ผ ํฉ๋๋ค.
์:
?๋ฅผ ์๋ฆฌ ํ์์๋ก ์ฌ์ฉ:from snowflake.connector import connect connection_parameters = { 'account': 'xxxxx', 'user': 'xxxx', 'password': 'xxxxxx', "host": "xxxxxx", "port": 443, 'protocol': 'https', 'warehouse': 'xxx', 'database': 'xxx', 'schema': 'xxx', 'paramstyle': 'qmark' # note paramstyle setting here at connection level } con = connect(**connection_parameters) con.cursor().execute( "INSERT INTO testtable2(col1,col2,col3) " "VALUES(?,?,?)", ( 987, 'test string4', ("TIMESTAMP_LTZ", datetime.now()) ) )
:N์ ์๋ฆฌ ํ์์๋ก ์ฌ์ฉ:import snowflake.connector snowflake.connector.paramstyle='numeric' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(:1, :2)", ( 789, 'test string3' ))
๋ค์ ์ฟผ๋ฆฌ๋
numeric๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ์ฌ ๋ณ์๋ฅผ ์ฌ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.con.cursor().execute( "INSERT INTO testtable(complete_video, short_sample_of_video) " "VALUES(:1, SUBSTRING(:1, :2, :3))", ( binary_value_that_stores_video, # variable :1 starting_offset_in_bytes_of_video_clip, # variable :2 length_in_bytes_of_video_clip # variable :3 ))
datetime ์ค๋ธ์ ํธ์ ํจ๊ป qmark ๋๋ numeric ๋ฐ์ธ๋ฉ ์ฌ์ฉํ๊ธฐยถ
qmark ๋๋ numeric ๋ฐ์ธ๋ฉ์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ Snowflake TIMESTAMP ๋ฐ์ดํฐ ํ์
์ผ๋ก ๋ฐ์ธ๋ฉํ๋ ๊ฒฝ์ฐ, ๋ฐ์ธ๋ฉ ๋ณ์๋ฅผ Snowflake ํ์์คํฌํ ๋ฐ์ดํฐ ํ์
(TIMESTAMP_LTZ ๋๋ TIMESTAMP_TZ) ๋ฐ ๊ฐ์ ์ง์ ํ๋ ํํ๋ก ์ค์ ํฉ๋๋ค. ์:
import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "CREATE OR REPLACE TABLE testtable2 (" " col1 int, " " col2 string, " " col3 timestamp_ltz" ")" ) con.cursor().execute( "INSERT INTO testtable2(col1,col2,col3) " "VALUES(?,?,?)", ( 987, 'test string4', ("TIMESTAMP_LTZ", datetime.now()) ) )
ํด๋ผ์ด์ธํธ์ธก ๋ฐ์ธ๋ฉ๊ณผ ๋ฌ๋ฆฌ, ์๋ฒ์ธก ๋ฐ์ธ๋ฉ์๋ ์ด์ ๋ํ Snowflake ๋ฐ์ดํฐ ํ์
์ด ํ์ํฉ๋๋ค. ๊ฐ์ฅ ์ผ๋ฐ์ ์ธ Python ๋ฐ์ดํฐ ํ์
์๋ ์ด๋ฏธ Snowflake ๋ฐ์ดํฐ ํ์
์ ๋ํ ์์์ ๋งคํ(์: int ์ด FIXED ๋ก ๋งคํ๋จ)์ด ์์ต๋๋ค. ๊ทธ๋ฌ๋ Python datetime ๋ฐ์ดํฐ๋ ์ฌ๋ฌ Snowflake ๋ฐ์ดํฐ ํ์
์ค 1๊ฐ(TIMESTAMP_NTZ, TIMESTAMP_LTZ ๋๋ TIMESTAMP_TZ)์ ๋ฐ์ธ๋ฉ๋ ์ ์์ผ๋ฉฐ ๊ธฐ๋ณธ ๋งคํ์ TIMESTAMP_NTZ ์ด๋ฏ๋ก ์ฌ์ฉ์๊ฐ ์ฌ์ฉํ Snowflake ๋ฐ์ดํฐ ํ์
์ ์ง์ ํด์ผ ํฉ๋๋ค.
IN ์ฐ์ฐ์์ ํจ๊ป ๋ฐ์ธ๋ฉ ๋ณ์ ์ฌ์ฉํ๊ธฐยถ
qmark ๋ฐ numeric (์๋ฒ์ธก ๋ฐ์ธ๋ฉ)์์๋ IN ์ฐ์ฐ์์ ํจ๊ป ๋ณ์๋ฅผ ๋ฐ์ธ๋ฉํ๋ ๊ฒ์ ์ง์ํ์ง ์์ต๋๋ค.
๋ฐ์ธ๋ฉ ๋ณ์์ IN ์ฐ์ฐ์๋ฅผ ํจ๊ป ์ฌ์ฉํด์ผ ํ๋ ๊ฒฝ์ฐ์๋ ํด๋ผ์ด์ธํธ์ธก ๋ฐ์ธ๋ฉ (pyformat ๋๋ format)์ ์ฌ์ฉํ์ญ์์ค.
์ผ๊ด ์ฝ์ ์ ์ํด ๋งค๊ฐ ๋ณ์๋ฅผ ๋ณ์์ ๋ฐ์ธ๋ฉํ๊ธฐยถ
์ ํ๋ฆฌ์ผ์ด์
์ฝ๋์์๋ ๋จ์ผ ์ผ๊ด ์ฒ๋ฆฌ์ ์ฌ๋ฌ ํ์ ์ฝ์
ํ ์ ์์ต๋๋ค. ์ด ์์
์ ์ํํ๋ ค๋ฉด INSERT ๋ฌธ์์ ๊ฐ์ ๋ํ ๋งค๊ฐ ๋ณ์๋ฅผ ์ฌ์ฉํ์ญ์์ค. ์๋ฅผ ๋ค์ด, ๋ค์ ๋ฌธ์์๋ INSERT ๋ฌธ์์ qmark ๋ฐ์ธ๋ฉ์ ์ํด ์๋ฆฌ ํ์์๋ฅผ ์ฌ์ฉํฉ๋๋ค.
insert into grocery (item, quantity) values (?, ?)
๊ทธ๋ฆฌ๊ณ ์ฝ์ ํ ๋ฐ์ดํฐ๋ฅผ ์ง์ ํ๋ ค๋ฉด ์ํ์ค์ ์ํ์ค(์: ํํ ๋ชฉ๋ก)์ธ ๋ณ์๋ฅผ ์ ์ํฉ๋๋ค.
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
์์ ์์์์ ๊ฐ์ด, ๋ชฉ๋ก์ ๊ฐ ํญ๋ชฉ์ ์ฝ์ ํ ํ์ ๋ํ ์ด ๊ฐ์ด ํฌํจ๋ ํํ์ ๋๋ค.
๋ฐ์ธ๋ฉ์ ์คํํ๋ ค๋ฉด executemany() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ๋ณ์๋ฅผ ๋ ๋ฒ์งธ ์ธ์๋ก ์ ๋ฌํฉ๋๋ค. ์:
conn = snowflake.connector.connect( ... ) rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)] conn.cursor().executemany( "insert into grocery (item, quantity) values (?, ?)", rows_to_insert)
์๋ฒ์ธก์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉ (์ฆ, qmark ๋๋ numeric ๋ฐ์ธ๋ฉ)ํ๋ ๊ฒฝ์ฐ, ์ปค๋ฅํฐ๋ ๋ฐ์ธ๋ฉ์ ํตํด ์ผ๊ด ์ฝ์
์ ์ฑ๋ฅ์ ์ต์ ํํ ์ ์์ต๋๋ค.
์ด๋ฌํ ๋ฐฉ์์ ์ฌ์ฉํ์ฌ ๊ฐ์ ๋๋์ผ๋ก ์ฝ์ ํ๋ ๊ฒฝ์ฐ ๋๋ผ์ด๋ฒ๋ ์์ง์ ์ํ ์์ ์คํ ์ด์ง๋ก ๋ฐ์ดํฐ๋ฅผ ์คํธ๋ฆฌ๋ฐํ์ฌ(๋ก์ปฌ ์์คํ ์ ํ์ผ์ ์์ฑํ์ง ์์) ์ฑ๋ฅ์ ํฅ์ํ ์ ์์ต๋๋ค. ๊ฐ์ ๊ฐ์๊ฐ ์๊ณ๊ฐ์ ์ด๊ณผํ๋ ๊ฒฝ์ฐ ๋๋ผ์ด๋ฒ๋ ์๋์ผ๋ก ์ด ์์ ์ ์ํํฉ๋๋ค.
๋ํ, ์ธ์ ์ ํ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์คํค๋ง๋ฅผ ์ค์ ํด์ผ ํฉ๋๋ค. ์ด๋ฌํ ๊ฐ์ด ์ค์ ๋์ง ์์ ๊ฒฝ์ฐ์๋ ๋๋ผ์ด๋ฒ๊ฐ ์คํํ๋ CREATE TEMPORARY STAGE ๋ช ๋ น์์ ๋ค์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ฉฐ ์คํจํ ์ ์์ต๋๋ค.
CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
์ฐธ๊ณ
Snowflake ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๋ ๋์ฒด ๋ฐฉ๋ฒ(COPY ๋ช ๋ น์ ์ฌ์ฉํ ๋๋ ๋ก๋ ๋ฑ)๊ณผ ๊ด๋ จํด์๋ Snowflake์ ๋ฐ์ดํฐ ๋ก๋ํ๊ธฐ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
SQL ์ฝ์ ๊ณต๊ฒฉ ๋ฐฉ์งํ๊ธฐยถ
SQL ์ฝ์ ์ ์ํ์ด ์์ผ๋ฏ๋ก Python์ ํ์ ์ง์ ํจ์๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ธ๋ฉํ์ง ๋ง์์ผ ํฉ๋๋ค. ์:
# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%(col1)d, '%(col2)s')" % { 'col1': 789, 'col2': 'test string3' })# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%d, '%s')" % ( 789, 'test string3' ))# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES({col1}, '{col2}')".format( col1=789, col2='test string3') )
๋์ ์ ๊ฐ์ ๋ณ์์ ์ ์ฅํ ๋ค์ qmark ๋๋ ์ซ์ ๋ฐ์ธ๋ฉ ์คํ์ผ์ ์ฌ์ฉํ์ฌ ํด๋น ๋ณ์๋ฅผ ๋ฐ์ธ๋ฉํฉ๋๋ค.
์ด ๋ฉํ๋ฐ์ดํฐ ๊ฒ์ํ๊ธฐยถ
๊ฒฐ๊ณผ ์ธํธ์์ ๊ฐ ์ด์ ๋ํ ๋ฉํ๋ฐ์ดํฐ(์: ๊ฐ ์ด์ ์ด๋ฆ, ํ์ , ์ ์ฒด ์๋ฆฟ์, ์์ ์๋ฆฟ์ ๋ฑ)๋ฅผ ๊ฒ์ํ๋ ค๋ฉด ๋ค์ ๋ฐฉ์ ์ค ํ๋๋ฅผ ์ฌ์ฉํ์ญ์์ค.
์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ธฐ ์ํด
execute()๋ฉ์๋๋ฅผ ํธ์ถํ ํ ๋ฉํ๋ฐ์ดํฐ์ ์ก์ธ์คํ๋ ค๋ฉดCursor์ค๋ธ์ ํธ์describe์์ฑ์ ์ฌ์ฉํฉ๋๋ค.์ฟผ๋ฆฌ๋ฅผ ์คํํ ํ์ ์์ด ๋ฉํ๋ฐ์ดํฐ์ ์ก์ธ์คํ๋ ค๋ฉด
describe()๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค.describe๋ฉ์๋๋ Python์ฉ Snowflake ์ปค๋ฅํฐ 2.4.6 ์ด์ ๋ฒ์ ์์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
description ์์ฑ์ ๋ค์ ๊ฐ ์ค 1๊ฐ๋ก ์ค์ ๋ฉ๋๋ค.
2.4.5 ์ดํ ๋ฒ์ : ํํ์ ๋ชฉ๋ก.
2.4.6 ์ด์ ๋ฒ์ : ResultMetadata ์ค๋ธ์ ํธ์ ๋ชฉ๋ก. (
describe๋ฉ์๋๋ ์ด ๋ชฉ๋ก์ ๋ฐํํฉ๋๋ค.)
๊ฐ ํํ ๋ฐ ResultMetadata ์ค๋ธ์ ํธ์๋ ์ด์ ๋ํ ๋ฉํ๋ฐ์ดํฐ(์ด ์ด๋ฆ, ๋ฐ์ดํฐ ํ์
๋ฑ)๊ฐ ํฌํจ๋์ด ์์ต๋๋ค. ๋ฉํ๋ฐ์ดํฐ์๋ ์ธ๋ฑ์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋๋ 2.4.6 ์ด์ ๋ฒ์ ์ ๊ฒฝ์ฐ ResultMetadata ์์ฑ์ ์ฌ์ฉํ์ฌ ์ก์ธ์คํ ์ ์์ต๋๋ค.
๋ค์ ์๋ ๋ฐํ๋ ํํ ๋ฐ ResultMetadata ์ค๋ธ์ ํธ์์ ๋ฉํ๋ฐ์ดํฐ์ ์ก์ธ์คํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
์: ์์ธ์ ์ฌ์ฉํ์ฌ ์ด ์ด๋ฆ ๋ฉํ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ(2.4.5 ๋ฐ ์ด์ ๋ฒ์ ):
๋ค์ ์์์๋ description ์์ฑ์ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์คํํ ํ ์ด ์ด๋ฆ ๋ชฉ๋ก์ ๊ฒ์ํฉ๋๋ค. ์ด ์์ฑ์ ํํ์ ๋ชฉ๋ก์ผ๋ก, ์ด ์์์๋ ๊ฐ ํํ์ ์ฒซ ๋ฒ์งธ ๊ฐ์์ ์ด ์ด๋ฆ์ ์ก์ธ์คํฉ๋๋ค.
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col[0] for col in cur.description]))
์: ์์ฑ์ ์ฌ์ฉํ์ฌ ์ด ์ด๋ฆ ๋ฉํ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ(2.4.6 ์ด์ ๋ฒ์ ):
๋ค์ ์์์๋ description ์์ฑ์ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์คํํ ํ ์ด ์ด๋ฆ ๋ชฉ๋ก์ ๊ฒ์ํฉ๋๋ค. ์ด ์์ฑ์ ResultMetaData ์ค๋ธ์ ํธ์ ๋ชฉ๋ก์ผ๋ก, ์ด ์์์๋ ๊ฐ ResultMetadata ์ค๋ธ์ ํธ์ name ์์ฑ์์ ์ด ์ด๋ฆ์ ์ก์ธ์คํฉ๋๋ค.
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col.name for col in cur.description]))
์: ์ฟผ๋ฆฌ๋ฅผ ์คํํ์ง ์๊ณ ์ด ์ด๋ฆ ๋ฉํ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ(2.4.6 ์ด์ ๋ฒ์ ):
๋ค์ ์์์๋ describe ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์คํํ์ง ์๊ณ ์ด ์ด๋ฆ ๋ชฉ๋ก์ ๊ฒ์ํฉ๋๋ค. describe() ๋ฉ์๋๋ ResultMetaData ์ค๋ธ์ ํธ์ ๋ชฉ๋ก์ ๋ฐํํ๋๋ฐ, ์ด ์์์๋ ๊ฐ ResultMetadata ์ค๋ธ์ ํธ์ name ์์ฑ์์ ์ด ์ด๋ฆ์ ์ก์ธ์คํฉ๋๋ค.
cur = conn.cursor() result_metadata_list = cur.describe("SELECT * FROM test_table") print(','.join([col.name for col in result_metadata_list]))
์ค๋ฅ ์ฒ๋ฆฌยถ
์ ํ๋ฆฌ์ผ์ด์ ์ Snowflake ์ปค๋ฅํฐ์์ ๋ฐ์ํ ์์ธ๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ์ฒ๋ฆฌํ๊ณ ์ฝ๋ ์คํ์ ๊ณ์ ๋๋ ์ค์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํฉ๋๋ค.
# Catching the syntax error cur = con.cursor() try: cur.execute("SELECT * FROM testtable") except snowflake.connector.errors.ProgrammingError as e: # default error message print(e) # customer error message print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid)) finally: cur.close()
execute_stream ์ ์ฌ์ฉํ์ฌ SQL ์คํฌ๋ฆฝํธ ์คํํ๊ธฐยถ
execute_stream ํจ์๋ฅผ ์ฌ์ฉํ๋ฉด ์คํธ๋ฆผ์์ 1๊ฐ ์ด์์ SQL ์คํฌ๋ฆฝํธ๋ฅผ ์คํํ ์ ์์ต๋๋ค.
from codecs import open with open(sqlfile, 'r', encoding='utf-8') as f: for cur in con.execute_stream(f): for ret in cur: print(ret)
์ฐธ๊ณ
sql_stream ์ ์ฃผ์์ด ํฌํจ๋์ด ์๋ ๊ฒฝ์ฐ ์ถ๊ฐ ๊ตฌ์ฑ์ด ํ์ํ ์ ์์ต๋๋ค. execute_stream์ ์ฌ์ฉํ์ฌ SQL ์คํฌ๋ฆฝํธ ์คํํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
์ฐ๊ฒฐ ์ข ๋ฃํ๊ธฐยถ
close ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์ฐ๊ฒฐ์ ์ข
๋ฃํ๋ ๊ฒ์ด ๋ชจ๋ฒ ์ฌ๋ก์
๋๋ค.
connection.close()
์ด๋ฅผ ํตํด ์์ง๋ ํด๋ผ์ด์ธํธ ๋ฉํธ๋ฆญ์ ์๋ฒ๋ก ์ ์ถํ๊ณ ์ธ์
์ ์ญ์ ํ ์ ์์ต๋๋ค. ๋ํ, try-finally ๋ฅผ ์ฌ์ฉํ๋ฉด ์ค๊ฐ์ ์์ธ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ์๋ ์ฐ๊ฒฐ์ด ์ข
๋ฃ๋์ง ์๋๋ก ํ ์ ์์ต๋๋ค.
# Connecting to Snowflake con = snowflake.connector.connect(...) try: # Running queries con.cursor().execute(...) ... finally: # Closing the connection con.close()
์กฐ์ฌ
๋ซํ์ง ์์ ์ฐ๊ฒฐ์ด ์ฌ๋ฌ ๊ฐ ์์ผ๋ฉด ์์คํ ๋ฆฌ์์ค๊ฐ ๊ณ ๊ฐ๋์ด ๊ฒฐ๊ตญ ์ ํ๋ฆฌ์ผ์ด์ ์ถฉ๋์ด ๋ฐ์ํ ์ ์์ต๋๋ค.
์ปจํ ์คํธ ๊ด๋ฆฌ์๋ฅผ ์ฌ์ฉํ ํธ๋์ญ์ ์ฐ๊ฒฐ ๋ฐ ๊ด๋ฆฌํ๊ธฐยถ
Python์ฉ Snowflake ์ปค๋ฅํฐ๋ ํ์ํ ๊ฒฝ์ฐ ๋ฆฌ์์ค๋ฅผ ํ ๋น ๋ฐ ํด์ ํ๋ ์ปจํ
์คํธ ๊ด๋ฆฌ์๋ฅผ ์ง์ํฉ๋๋ค. ์ปจํ
์คํธ ๊ด๋ฆฌ์๋ autocommit ์ด ๋นํ์ฑํ๋ ๊ฒฝ์ฐ ๋ฌธ์ ์ํ์ ๋ฐ๋ผ ํธ๋์ญ์
์ ์ปค๋ฐ ๋๋ ๋กค๋ฐฑํ๋ ๋ฐ ์ ์ฉํฉ๋๋ค.
# Connecting to Snowflake using the context manager with snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False, ) as con: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
์์ ์์์, ์ธ ๋ฒ์งธ ๋ฌธ์ด ์คํจํ๋ฉด ์ปจํ ์คํธ ๊ด๋ฆฌ์๊ฐ ํธ๋์ญ์ ์ ๋ณ๊ฒฝ ์ฌํญ์ ๋กค๋ฐฑํ๊ณ ์ฐ๊ฒฐ์ ๋์ต๋๋ค. ๋ชจ๋ ๋ฌธ์ด ์ฑ๊ณตํ๋ฉด ์ปจํ ์คํธ ๊ด๋ฆฌ์๊ฐ ๋ณ๊ฒฝ ์ฌํญ์ ์ปค๋ฐํ๊ณ ์ฐ๊ฒฐ์ ๋์ต๋๋ค.
try ๋ฐ except ๋ธ๋ก์ ํด๋นํ๋ ์ฝ๋๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
# Connecting to Snowflake using try and except blocks con = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False) try: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail con.commit() except Exception as e: con.rollback() raise e finally: con.close()
VECTOR ๋ฐ์ดํฐ ํ์ ์ฌ์ฉํ๊ธฐยถ
VECTOR ๋ฐ์ดํฐ ํ์ ์ ๋ํ ์ง์์ ๋ฒ์ 3.6.0์ Snowflake Python Connector์ ๋์ ๋์์ต๋๋ค. VECTOR ๋ฐ์ดํฐ ํ์ ์ ๋ฒกํฐ ์ ์ฌ์ฑ ํจ์ ์ ํจ๊ป ์ฌ์ฉํ์ฌ ๋ฒกํฐ ๊ฒ์ ๋๋ RAG (retrieval-augmented-generation)๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ตฌํํ ์ ์์ต๋๋ค.
๋ค์ ์ฝ๋ ์์ ๋ Python ์ปค๋ฅํฐ๋ฅผ ์ฌ์ฉํ์ฌ VECTOR ์ด์ด ์๋ ํ ์ด๋ธ์ ์์ฑํ๊ณ VECTOR_INNER_PRODUCT ํจ์๋ฅผ ํธ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
import snowflake.connector
conn = ... # Set up connection
cur = conn.cursor()
# Create a table and insert some vectors
cur.execute("CREATE OR REPLACE TABLE vectors (a VECTOR(FLOAT, 3), b VECTOR(FLOAT, 3))")
values = [([1.1, 2.2, 3], [1, 1, 1]), ([1, 2.2, 3], [4, 6, 8])]
for row in values:
cur.execute(f"""
INSERT INTO vectors(a, b)
SELECT {row[0]}::VECTOR(FLOAT,3), {row[1]}::VECTOR(FLOAT,3)
""")
# Compute the pairwise inner product between columns a and b
cur.execute("SELECT VECTOR_INNER_PRODUCT(a, b) FROM vectors")
print(cur.fetchall())
[(6.30...,), (41.2...,)]
๋ค์ ์ฝ๋ ์์ ์์๋ Python Connector๋ฅผ ์ฌ์ฉํ์ฌ [1,2,3] ์ ๊ฐ์ฅ ๊ฐ๊น์ด ๋ฒกํฐ๋ฅผ ์ฐพ๊ธฐ ์ํด VECTOR_COSINE_SIMILARITY ๋ฅผ ํธ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
cur.execute(f"""
SELECT a, VECTOR_COSINE_SIMILARITY(a, {[1,2,3]}::VECTOR(FLOAT, 3))
AS similarity
FROM vectors
ORDER BY similarity DESC
LIMIT 1;
""")
print(cur.fetchall())
[([1.0, 2.2..., 3.0], 0.9990...)]
์ฐธ๊ณ
VECTOR ๋ฐ์ดํฐ ํ์ ์๋ ๋ณ์ ๋ฐ์ธ๋ฉ์ด ์ง์๋์ง ์์ต๋๋ค.
๋ก๊น ยถ
Python์ฉ Snowflake ์ปค๋ฅํฐ๋ ํ์ค Python logging ๋ชจ๋์ ์ฌ์ฉํ์ฌ ์ผ์ ๊ฐ๊ฒฉ์ผ๋ก ์ํ๋ฅผ ๊ธฐ๋กํจ์ผ๋ก์จ ์ ํ๋ฆฌ์ผ์ด์
์ด ๋ฐฑ๊ทธ๋ผ์ด๋๋ก ์คํ๋๋ ํ๋์ ์ถ์ ํ ์ ์๋๋ก ํด์ค๋๋ค. ๋ก๊น
์ ํ์ฑํํ๋ ๊ฐ์ฅ ๊ฐ๋จํ ๋ฐฉ๋ฒ์ ์ ํ๋ฆฌ์ผ์ด์
์ ์์ํ ๋ logging.basicConfig() ๋ฅผ ํธ์ถํ๋ ๊ฒ์
๋๋ค.
์๋ฅผ ๋ค์ด, ๋ก๊น
์์ค์ INFO ๋ก ์ค์ ํ๊ณ /tmp/snowflake_python_connector.log ํ์ผ์ ๋ก๊ทธ๋ฅผ ์ ์ฅํ๋ ค๋ฉด:
logging.basicConfig( filename=file_name, level=logging.INFO)
๋ณด๋ค ํฌ๊ด์ ์ธ ๋ก๊น
์ ๋ค์๊ณผ ๊ฐ์ด ๋ก๊น
์์ค์ DEBUG ๋ก ์ค์ ํ์ฌ ํ์ฑํํ ์ ์์ต๋๋ค.
# Logging including the timestamp, thread and the source code location import logging for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)์ ํ ์ฌํญ์ด์ง๋ง ๊ถ์ฅ๋๋ SecretDetector ํฌ๋งทํฐ ํด๋์ค๋ฅผ ์ฌ์ฉํ๋ฉด ์๋ ค์ง ๋ฏผ๊ฐํ ์ ๋ณด ์ธํธ๋ฅผ ๋ง์คํนํ ํ Snowflake Python Connector ๋ก๊ทธ ํ์ผ์ ์์ฑํ ์ ์์ต๋๋ค. SecretDetector๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด, ๋ค์๊ณผ ์ ์ฌํ ์ฝ๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
# Logging including the timestamp, thread and the source code location import logging from snowflake.connector.secret_detector import SecretDetector for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)์ฐธ๊ณ
botocore๋ฐboto3๋ Python์ฉ AWS(Amazon Web Services) SDK๋ฅผ ํตํด ์ฌ์ฉํ ์ ์์ต๋๋ค.
๊ตฌ์ฑ ํ์ผ ๋ก๊ทธํ๊ธฐยถ
๋๋ ๋ก๊ทธ ์์ค๊ณผ config.toml ๊ตฌ์ฑ ํ์ผ์ ๋ก๊ทธ ํ์ผ์ ์ ์ฅํ ๋๋ ํฐ๋ฆฌ๋ฅผ ์์ฝ๊ฒ ์ง์ ํ ์ ์์ต๋๋ค. ์ด ํ์ผ์ ๋ํ ์์ธํ ๋ด์ฉ์ connections.toml ํ์ผ์ ์ฌ์ฉํ์ฌ ์ฐ๊ฒฐํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
์ฐธ๊ณ
์ด ๋ก๊น ๊ตฌ์ฑ ๊ธฐ๋ฅ์ Python ๋ก๊น ๋ฌธ์์ ์ ์๋ ๋๋ก ๋ก๊ทธ ์์ค์ ์ง์ํฉ๋๋ค.
๋ก๊น ์์ค์ ๋ํ ์์ธํ ๋ด์ฉ์ Python ๊ธฐ๋ณธ ๋ก๊น ์์ต์ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
์ด ๋ก๊น
๊ตฌ์ฑ ํ์ผ์์๋ toml์ ์ฌ์ฉํ์ฌ ๋ค์๊ณผ ๊ฐ์ด save_logs, level, path ๋ก๊น
๋งค๊ฐ ๋ณ์๋ฅผ ์ ์ํฉ๋๋ค.
[log]
save_logs = true
level = "INFO"
path = "<directory to store logs>"
์ฌ๊ธฐ์,
save_logs๋ ๋ก๊ทธ๋ฅผ ์ ์ฅํ ์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํฉ๋๋ค.level์ ๋ก๊น ์์ค์ ์ง์ ํฉ๋๋ค. ์ ์ํ์ง ์์ผ๋ฉด ๋๋ผ์ด๋ฒ์ ๊ธฐ๋ณธ๊ฐ์INFO์ ๋๋ค.``path``๋ ๋ก๊ทธ ํ์ผ์ ์ ์ฅํ ๋๋ ํฐ๋ฆฌ๋ฅผ ์๋ณํฉ๋๋ค. ์ ์๋์ง ์์ ๊ฒฝ์ฐ ๋๋ผ์ด๋ฒ์์ ๊ธฐ๋ณธ
$SNOWFLAKE_HOME/logs/๋๋ ํฐ๋ฆฌ์ ๋ก๊ทธ๋ฅผ ์ ์ฅํฉ๋๋ค.
์ฐธ๊ณ
config.toml ํ์ผ์ [log] ์น์
์ด ํฌํจ๋์ด ์์ง ์์ผ๋ฉด ๋ก๊ทธ ๋ฉ์์ง๊ฐ ์ ์ฅ๋์ง ์์ต๋๋ค.
ํ๋ฃจ ๋์์ ๋ก๊ทธ ๋ฉ์์ง๋ python-connector.log ํ์ผ์ ์ถ๊ฐ๋๋ฉฐ, ์ด ํ์ผ์ ๋์ค์ python-connector.log.YYYY-MM-DD ๋ก ์ด๋ฆ์ด ๋ณ๊ฒฝ๋ฉ๋๋ค.
์ํ ํ๋ก๊ทธ๋จยถ
๋ค์ ์ํ ์ฝ๋์์๋ ์ด์ ์น์ ์์ ์ค๋ช ํ ์ฌ๋ฌ ์๋ฅผ ์๋ํ๋ Python ํ๋ก๊ทธ๋จ์ ํตํฉํฉ๋๋ค. ์ด ์์๋ ๋ค์์ ๋ ๋ถ๋ถ์ด ํฌํจ๋ฉ๋๋ค.
์์ ํด๋์ค(โpython_veritas_baseโ)์๋ ์๋ฒ ์ฐ๊ฒฐ๊ณผ ๊ฐ์ ์ฌ๋ฌ ์ผ๋ฐ ์์ ์ ์ํ ์ฝ๋๊ฐ ํฌํจ๋ฉ๋๋ค.
ํ์ ํด๋์ค(โpython_connector_exampleโ)๋ ํ ์ด๋ธ ์ฟผ๋ฆฌ์ ๊ฐ์ ํน์ ํด๋ผ์ด์ธํธ๋ฅผ ์ํ ์ฌ์ฉ์ ์ง์ ๋ถ๋ถ์ ๋ํ๋ ๋๋ค.
์ด ์ํ ์ฝ๋๋ ์ต์ ์ ํ ๋น๋์์์ ์คํ์ ๋ณด์ฅํ๊ธฐ ์ํด ํ ์คํธ ์ค 1๊ฐ์์ ์ง์ ๊ฐ์ ธ์จ ์ฝ๋์ ๋๋ค.
์ด ์ฝ๋๋ ํ ์คํธ์์ ๊ฐ์ ธ์จ ๊ฒ์ด๋ฏ๋ก ์ผ๋ถ ํ ์คํธ์์ ์ฌ์ฉ๋๋ ๋์ฒด ํฌํธ ๋ฐ ํ๋กํ ์ฝ์ ์ค์ ํ๊ธฐ ์ํ ์๋์ ์ฝ๋๊ฐ ํฌํจ๋์ด ์์ต๋๋ค. ์ฌ์ฉ์๋ ํ๋กํ ์ฝ ๋๋ ํฌํธ ๋ฒํธ๋ฅผ ์ค์ ํ์ง ์์์ผ ํ๋ฉฐ, ๋์ ์ด๋ฌํ ๋จ๊ณ๋ฅผ ์๋ตํ๊ณ ๊ธฐ๋ณธ๊ฐ์ ์ฌ์ฉํด์ผ ํฉ๋๋ค.
์ฌ๊ธฐ์๋ ๋ฌธ์์ ๋ณ๋๋ก ๊ฐ์ ธ์ฌ ์ ์๋ ์ฝ๋๋ฅผ ๋ํ๋ด๊ธฐ ์ํ ์น์ ๋ง์ปค(โ์ฝ๋ ์กฐ๊ฐ ํ๊ทธโ๋ผ๊ณ ๋ ํจ)๋ ํฌํจ๋์ด ์์ต๋๋ค. ์น์ ๋ง์ปค๋ ์ผ๋ฐ์ ์ผ๋ก ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
์ด๋ฌํ ์น์ ๋ง์ปค๋ ์ฌ์ฉ์ ์ฝ๋์์ ํ์๊ฐ ์๋๋๋ค.
์ฝ๋ ์ํ์ ์ฒซ ๋ฒ์งธ ๋ถ๋ถ์๋ ๋ค์์ ์ํํ๊ธฐ ์ํ ๊ณตํต ์๋ธ๋ฃจํด์ด ํฌํจ๋์ด ์์ต๋๋ค.
์ฐ๊ฒฐ ์ ๋ณด๊ฐ ํฌํจ๋ ๋ช ๋ น์ค ์ธ์(์: โโwarehouse MyWarehouseโ) ์ฝ๊ธฐ.
์๋ฒ์ ์ฐ๊ฒฐํ๊ธฐ.
์จ์ดํ์ฐ์ค, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์คํค๋ง ๋ง๋ค๊ธฐ ๋ฐ ์ฌ์ฉํ๊ธฐ.
์ฌ์ฉ ์๋ฃ ์ ์คํค๋ง, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์จ์ดํ์ฐ์ค ์ญ์ ํ๊ธฐ.
import logging
import os
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
class python_veritas_base:
"""
PURPOSE:
This is the Base/Parent class for programs that use the Snowflake
Connector for Python.
This class is intended primarily for:
* Sample programs, e.g. in the documentation.
* Tests.
"""
def __init__(self, p_log_file_name = None):
"""
PURPOSE:
This does any required initialization steps, which in this class is
basically just turning on logging.
"""
file_name = p_log_file_name
if file_name is None:
file_name = '/tmp/snowflake_python_connector.log'
# -- (> ---------- SECTION=begin_logging -----------------------------
logging.basicConfig(
filename=file_name,
level=logging.INFO)
# -- <) ---------- END_SECTION ---------------------------------------
# -- (> ---------------------------- SECTION=main ------------------------
def main(self, argv):
"""
PURPOSE:
Most tests follow the same basic pattern in this main() method:
* Create a connection.
* Set up, e.g. use (or create and use) the warehouse, database,
and schema.
* Run the queries (or do the other tasks, e.g. load data).
* Clean up. In this test/demo, we drop the warehouse, database,
and schema. In a customer scenario, you'd typically clean up
temporary tables, etc., but wouldn't drop your database.
* Close the connection.
"""
# Read the connection parameters (e.g. user ID) from the command line
# and environment variables, then connect to Snowflake.
connection = self.create_connection(argv)
# Set up anything we need (e.g. a separate schema for the test/demo).
self.set_up(connection)
# Do the "real work", for example, create a table, insert rows, SELECT
# from the table, etc.
self.do_the_real_work(connection)
# Clean up. In this case, we drop the temporary warehouse, database, and
# schema.
self.clean_up(connection)
print("\nClosing connection...")
# -- (> ------------------- SECTION=close_connection -----------------
connection.close()
# -- <) ---------------------------- END_SECTION ---------------------
# -- <) ---------------------------- END_SECTION=main --------------------
def args_to_properties(self, args):
"""
PURPOSE:
Read the command-line arguments and store them in a dictionary.
Command-line arguments should come in pairs, e.g.:
"--user MyUser"
INPUTS:
The command line arguments (sys.argv).
RETURNS:
Returns the dictionary.
DESIRABLE ENHANCEMENTS:
Improve error detection and handling.
"""
connection_parameters = {}
i = 1
while i < len(args) - 1:
property_name = args[i]
# Strip off the leading "--" from the tag, e.g. from "--user".
property_name = property_name[2:]
property_value = args[i + 1]
connection_parameters[property_name] = property_value
i += 2
return connection_parameters
def create_connection(self, argv):
"""
PURPOSE:
This gets account identifier and login information from the
environment variables and command-line parameters, connects to the
server, and returns the connection object.
INPUTS:
argv: This is usually sys.argv, which contains the command-line
parameters. It could be an equivalent substitute if you get
the parameter information from another source.
RETURNS:
A connection.
"""
# Get account identifier and login information from environment variables and command-line parameters.
# For information about account identifiers, see
# https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
# -- (> ----------------------- SECTION=set_login_info ---------------
# Get the password from an appropriate environment variable, if
# available.
PASSWORD = os.getenv('SNOWSQL_PWD')
# Get the other login info etc. from the command line.
if len(argv) < 11:
msg = "ERROR: Please pass the following command-line parameters:\n"
msg += "--warehouse <warehouse> --database <db> --schema <schema> "
msg += "--user <user> --account <account_identifier> "
print(msg)
sys.exit(-1)
else:
connection_parameters = self.args_to_properties(argv)
USER = connection_parameters["user"]
ACCOUNT = connection_parameters["account"]
WAREHOUSE = connection_parameters["warehouse"]
DATABASE = connection_parameters["database"]
SCHEMA = connection_parameters["schema"]
# Optional: for internal testing only.
try:
PORT = connection_parameters["port"]
except:
PORT = ""
try:
PROTOCOL = connection_parameters["protocol"]
except:
PROTOCOL = ""
# If the password is set by both command line and env var, the
# command-line value takes precedence over (is written over) the
# env var value.
# If the password wasn't set either in the environment var or on
# the command line...
if PASSWORD is None or PASSWORD == '':
print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
sys.exit(-2)
# -- <) ---------------------------- END_SECTION ---------------------
# Optional diagnostic:
#print("USER:", USER)
#print("ACCOUNT:", ACCOUNT)
#print("WAREHOUSE:", WAREHOUSE)
#print("DATABASE:", DATABASE)
#print("SCHEMA:", SCHEMA)
#print("PASSWORD:", PASSWORD)
#print("PROTOCOL:" "'" + PROTOCOL + "'")
#print("PORT:" + "'" + PORT + "'")
print("Connecting...")
# If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
# -- (> ------------------- SECTION=connect_to_snowflake ---------
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)
# -- <) ---------------------------- END_SECTION -----------------
else:
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA,
# Optional: for internal testing only.
protocol=PROTOCOL,
port=PORT
)
return conn
def set_up(self, connection):
"""
PURPOSE:
Set up to run a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.create_warehouse_database_and_schema(connection)
def do_the_real_work(self, conn):
"""
PURPOSE:
Your sub-class should override this to include the code required for
your documentation sample or your test case.
This default method does a very simple self-test that shows that the
connection was successful.
"""
# Create a cursor for this connection.
cursor1 = conn.cursor()
# This is an example of an SQL statement we might want to run.
command = "SELECT PI()"
# Run the statement.
cursor1.execute(command)
# Get the results (should be only one):
for row in cursor1:
print(row[0])
# Close this cursor.
cursor1.close()
def clean_up(self, connection):
"""
PURPOSE:
Clean up after a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.drop_warehouse_database_and_schema(connection)
def create_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Create the temporary schema, database, and warehouse that we use
for most tests/demos.
"""
# Create a database, schema, and warehouse if they don't already exist.
print("\nCreating warehouse, database, schema...")
# -- (> ------------- SECTION=create_warehouse_database_schema -------
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# -- (> --------------- SECTION=use_warehouse_database_schema --------
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
def drop_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Drop the temporary schema, database, and warehouse that we create
for most tests/demos.
"""
# -- (> ------------- SECTION=drop_warehouse_database_schema ---------
conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# ----------------------------------------------------------------------------
if __name__ == '__main__':
pvb = python_veritas_base()
pvb.main(sys.argv)
์ฝ๋ ์ํ์ ๋ ๋ฒ์งธ ๋ถํ์์๋ ํ ์ด๋ธ์ ์์ฑํ๊ณ ํ ์ด๋ธ์ ํ์ ์ฝ์ ํ๋ ๋ฑ์ ์์ ์ ์ํํฉ๋๋ค.
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
# Import the base class that contains methods used in many tests and code
# examples.
from python_veritas_base import python_veritas_base
class python_connector_example (python_veritas_base):
"""
PURPOSE:
This is a simple example program that shows how to use the Snowflake
Python Connector to create and query a table.
"""
def __init__(self):
pass
def do_the_real_work(self, conn):
"""
INPUTS:
conn is a Connection object returned from snowflake.connector.connect().
"""
print("\nCreating table test_table...")
# -- (> ----------------------- SECTION=create_table ---------------------
conn.cursor().execute(
"CREATE OR REPLACE TABLE "
"test_table(col1 integer, col2 string)")
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")
# -- <) ---------------------------- END_SECTION -------------------------
print("\nSelecting from test_table...")
# -- (> ----------------------- SECTION=querying_data --------------------
cur = conn.cursor()
try:
cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# -- <) ---------------------------- END_SECTION -------------------------
# ============================================================================
if __name__ == '__main__':
test_case = python_connector_example()
test_case.main(sys.argv)
์ด ์ํ์ ์คํํ๋ ค๋ฉด ๋ค์์ ์ํํด์ผ ํฉ๋๋ค.
์ฝ๋์ ์ฒซ ๋ฒ์งธ ๋ถ๋ถ์ โpython_veritas_base.pyโ ํ์ผ๋ก ๋ณต์ฌํฉ๋๋ค.
์ฝ๋์ ๋ ๋ฒ์งธ ๋ถ๋ถ์ โpython_connector_example.pyโ ํ์ผ๋ก ๋ณต์ฌํฉ๋๋ค.
SNOWSQL_PWD ํ๊ฒฝ ๋ณ์๋ฅผ ๋น๋ฐ๋ฒํธ์ ์ค์ ํฉ๋๋ค. ์:
export SNOWSQL_PWD='MyPassword'๋ค์๊ณผ ์ ์ฌํ ๋ช ๋ น์ค์ ์ฌ์ฉํ์ฌ ํ๋ก๊ทธ๋จ์ ์คํํฉ๋๋ค(์ฌ์ฉ์ ๋ฐ ๊ณ์ ์ ๋ณด๋ฅผ ์ฌ์ฉ์ ๋ฐ ๊ณ์ ์ ๋ณด๋ก ๋ฐ๊ฟ์ผ ํจ).
๊ฒฝ๊ณ
์ด ์์ ์ ์ํํ๋ฉด ํ๋ก๊ทธ๋จ์ ๋ง์ง๋ง์ ์จ์ดํ์ฐ์ค, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์คํค๋ง๊ฐ ์ญ์ ๋ฉ๋๋ค! ์์ค๋ ์ ์์ผ๋ฏ๋ก ๊ธฐ์กด ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ด๋ฆ์ ์ฌ์ฉํ์ง ๋ง์ญ์์ค.
python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
์ถ๋ ฅ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
Connecting...
Creating warehouse, database, schema...
Creating table test_table...
Selecting from test_table...
123, test string1
456, test string2
Closing connection...
๊ธธ์ด๊ฐ ๋ ๊ธด ์๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
์ฐธ๊ณ
๊ณ์ ๋ฐ ๋ก๊ทธ์ธ ์ ๋ณด๋ฅผ ์ค์ ํ ์น์ ์์ Snowflake ๋ก๊ทธ์ธ ์ ๋ณด(์ด๋ฆ, ๋น๋ฐ๋ฒํธ ๋ฑ)์ ์ผ์นํ๋๋ก ๋ณ์๋ฅผ ๋ฐ๊ฟจ๋์ง ํ์ธํ์ญ์์ค.
์ด ์์์๋ format() ํจ์๋ฅผ ์ฌ์ฉํ์ฌ ๋ฌธ์ ๊ตฌ์ฑํฉ๋๋ค. ํ๊ฒฝ์ SQL ์ฝ์ ๊ณต๊ฒฉ์ ์ํ์ด ์๋ ๊ฒฝ์ฐ์๋ format() ํจ์๋ฅผ ์ฌ์ฉํ๋ ๋์ ๊ฐ์ ๋ฐ์ธ๋ฉํ๋ ๊ฒ์ด ์ข์ ์ ์์ต๋๋ค.
#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#
# Logging
import logging
logging.basicConfig(
filename='/tmp/snowflake_python_connector.log',
level=logging.INFO)
import snowflake.connector
# Set ACCOUNT to your account identifier.
# See https://docs.snowflake.com/en/user-guide/gen-conn-config.
ACCOUNT = '<my_organization>-<my_account>'
# Set your login information.
USER = '<login_name>'
PASSWORD = '<password>'
import os
# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# Connecting to Snowflake
con = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
)
# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")
# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")
# Creating a table and inserting data
con.cursor().execute(
"CREATE OR REPLACE TABLE "
"testtable(col1 integer, col2 string)")
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(123, 'test string1'),(456, 'test string2')")
# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
STORAGE_INTEGRATION = myint
FILE_FORMAT=(field_delimiter=',')
""".format(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
# Querying data
cur = con.cursor()
try:
cur.execute("SELECT col1, col2 FROM testtable")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# Binding data
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%(col1)s, %(col2)s)", {
'col1': 789,
'col2': 'test string3',
})
# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))
# Catching syntax errors
cur = con.cursor()
try:
cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
# default error message
print(e)
# user error message
print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
cur.close()
# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
# Closing the connection
con.close()