Use binary COPY table FROM with psycopg2 Use binary COPY table FROM with psycopg2 python python

Use binary COPY table FROM with psycopg2


Here is the binary equivalent of COPY FROM for Python 3:

from io import BytesIOfrom struct import packimport psycopg2# Two rows of data; "id" is not in the upstream data source# Columns: node, ts, val1, val2data = [(23253, 342, -15.336734, 2494627.949375),        (23256, 348, 43.23524, 2494827.949375)]conn = psycopg2.connect("dbname=mydb user=postgres")curs = conn.cursor()# Determine starting value for sequencecurs.execute("SELECT nextval('num_data_id_seq')")id_seq = curs.fetchone()[0]# Make a binary file object for COPY FROMcpy = BytesIO()# 11-byte signature, no flags, no header extensioncpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))# Columns: id, node, ts, val1, val2# Zip: (column position, format, size)row_format = list(zip(range(-1, 4),                      ('i', 'i', 'h', 'f', 'd'),                      ( 4,   4,   2,   4,   8 )))for row in data:    # Number of columns/fields (always 5)    cpy.write(pack('!h', 5))    for col, fmt, size in row_format:        value = (id_seq if col == -1 else row[col])        cpy.write(pack('!i' + fmt, size, value))    id_seq += 1  # manually increment sequence outside of database# File trailercpy.write(pack('!h', -1))# Copy data to databasecpy.seek(0)curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)# Update sequence on databasecurs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))conn.commit()

Update

I rewrote the above approach to writing the files for COPY. My data in Python is in NumPy arrays, so it makes sense to use these. Here is some example data with with 1M rows, 7 columns:

import psycopg2import numpy as npfrom struct import packfrom io import BytesIOfrom datetime import datetimeconn = psycopg2.connect("dbname=mydb user=postgres")curs = conn.cursor()# NumPy record arrayshape = (7, 2000, 500)print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +         [('s' + str(x), 'f4') for x in range(shape[0])])data = np.empty(shape[1]*shape[2], dtype)data['id'] = np.arange(shape[1]*shape[2]) + 1data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])data['s0'] = np.random.rand(shape[1]*shape[2]) * 100prv = 's0'for nxt in data.dtype.names[4:]:    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10    prv = nxt

On my database, I have two tables that look like:

CREATE TABLE num_data_binary(  id integer PRIMARY KEY,  node integer NOT NULL,  ts smallint NOT NULL,  s0 real,  s1 real,  s2 real,  s3 real,  s4 real,  s5 real,  s6 real) WITH (OIDS=FALSE);

and another similar table named num_data_text.

Here are some simple helper functions to prepare the data for COPY (both text and binary formats) by using the information in the NumPy record array:

def prepare_text(dat):    cpy = BytesIO()    for row in dat:        cpy.write('\t'.join([repr(x) for x in row]) + '\n')    return(cpy)def prepare_binary(dat):    pgcopy_dtype = [('num_fields','>i2')]    for field, dtype in dat.dtype.descr:        pgcopy_dtype += [(field + '_length', '>i4'),                         (field, dtype.replace('<', '>'))]    pgcopy = np.empty(dat.shape, pgcopy_dtype)    pgcopy['num_fields'] = len(dat.dtype)    for i in range(len(dat.dtype)):        field = dat.dtype.names[i]        pgcopy[field + '_length'] = dat.dtype[i].alignment        pgcopy[field] = dat[field]    cpy = BytesIO()    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))    cpy.write(pgcopy.tostring())  # all rows    cpy.write(pack('!h', -1))  # file trailer    return(cpy)

This how I'm using the helper functions to benchmark the two COPY format methods:

def time_pgcopy(dat, table, binary):    print('Processing copy object for ' + table)    tstart = datetime.now()    if binary:        cpy = prepare_binary(dat)    else:  # text        cpy = prepare_text(dat)    tendw = datetime.now()    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +          str(cpy.tell()) + ' bytes; transfering to database')    cpy.seek(0)    if binary:        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)    else:  # text        curs.copy_from(cpy, table)    conn.commit()    tend = datetime.now()    print('Database copy time: ' + str(tend - tendw))    print('        Total time: ' + str(tend - tstart))    returntime_pgcopy(data, 'num_data_text', binary=False)time_pgcopy(data, 'num_data_binary', binary=True)

Here is the output from the last two time_pgcopy commands:

Processing copy object for num_data_textCopy object prepared in 0:01:15.288695; 84355016 bytes; transfering to databaseDatabase copy time: 0:00:37.929166        Total time: 0:01:53.217861Processing copy object for num_data_binaryCopy object prepared in 0:00:01.296143; 80000021 bytes; transfering to databaseDatabase copy time: 0:00:23.325952        Total time: 0:00:24.622095

So both the NumPy → file and file → database steps are way faster with the binary approach. The obvious difference is how Python prepares the COPY file, which is really slow for text. Generally speaking, the binary format loads into the database in 2/3 of the time as the text format for this schema.

Lastly, I compared the values in both tables within the database to see if the numbers were different. About 1.46% of the rows have different values for column s0, and this fraction increases to 6.17% for s6 (probably related on the random method that I used). The non-zero absolute differences between all 70M 32-bit float values range between 9.3132257e-010 and 7.6293945e-006. These small differences between the text and binary loading methods are due to the loss of precision from the float → text → float conversions required for the text format method.