Spanner Database to Postgres

NIRAV SHAH
2 min readJul 19, 2021

--

Migrating data from one environment to another is usual tasks every developer gets. If we use command line we can automate it & setup as per our requirement. Let’s see how we can export import database.

Export Spanner Data

From Spanner we can export data in avro format. This is default format for exporting data.

gcloud dataflow jobs run my-export-job \
--gcs-location='gs://dataflow-templates/latest/Cloud_Spanner_to_GCS_Avro' \
--region=us-east1 \
--disable-public-ips \
--service-account-email="spanner-backup@myproject.iam.gserviceaccount.com" \
--parameters='instanceId=myspannerInstance,databaseId=mydb,outputDir=gs://spanner_backup_01/mydb' \
--max-workers=10 \
--project=myproject
createTime: '2021-07-09T01:13:57.582627Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: 2021-07-08_18_13_56-3583574098731082022
location: us-east1
name: my-export-job
projectId: myproject
startTime: '2021-07-09T01:13:57.582627Z'
type: JOB_TYPE_BATCH

Wait till job completion

Simple command with details captured from above output. We can loop through till it states done.

gcloud dataflow jobs list --project myproject --filter="name=my-export-job" --format=json | jq '.[]|.state'
"Done"

Small python program to read avro & write to postgres

Simple command with details captured from above output. We can loop through till it states done.

pip3.6 install psycopg2-binary
pip3.6 install fastavro
pip3.6 install python-snappy

Connect Postgres RDS locally

Connect with session manager, Review connection pattern in reference link.

ssh  devuser@i-023841b3311095xx2 -L 9090:my-postgredb-sg.cluster-c3g329ovnrbl.ap-southeast-1.rds.amazonaws.com:5432

Code to import from avro to Postgre

Sample code for importing data to postgre. There are lot improvement can be done. For now we keep this as workable model for small set of database.

import psycopg2
import json
from fastavro import reader
#establishing the connection
conn = psycopg2.connect(
database="mydb", user='myuser', password='mypwd', host='127.0.0.1', port= '9090'
)
#Creating a cursor object using the cursor() method
sql = """INSERT INTO user_imported(identity,identity_type)
VALUES(%s,%s);"""
with open('user.avro-00000-of-00001', 'rb') as fo:
avro_reader = reader(fo)
line_count = 0
for row in avro_reader:
try:
print("processed for: " + row["identity"] )
cursor = conn.cursor()
cursor.execute(sql,(row["identity"],row["identity_type"]))
conn.commit()
cursor.close()
line_count += 1
if not (line_count % 1000):
print("No Of Records Processed: " + str(line_count))
except Exception as e:
print(e)
#Closing the connection
conn.close()

Reference:

Disclaimer: The activity does not check on realtime data migration task. That one has to take care seperately.

--

--