I had recently come across a medium-sized dataset (around 500GB of 30 CSV files -> I think 900 mill rows by 40 columns), which needed to be briefly analysed and then collapsed to a level which would be decided by the prior analysis.

I ended up using some python to get the data into a PostgreSQL database and performing the aggregation. Then – out of interest – I ran the same analysis using Google Big Query for an amazing speed improvement.

I wanted to share the python script and the method in-case someone could offer me some tips for the future:

Step A: Query using PostgreSQL

call_robocopy(path_of_csv) # Copy files to SSD (working-directory)
pandas_temp_table(path_2_csv, mytable) # Read-in a sample using PANDAS to get a feel for the structure and create a template table
create_postgresql_table(mytable) # Copy the structure of the ‘temporary table’
copy_csv_to_table(path_2_csv, mytable) # Uploading all the CSVs to the table using the faster COPY command i.e. without loading into RAM first
sql_query_to_csv(mytable, csv_results) # Perform my query and save the results

Unfortunately, I did not get a chance to time this but it took a very long-time (more than a day). I was new to PostgreSQL at the time and decided to experiment.

Step B: Optimising PostgreSQL and re-running 

I re-ran the same query as above but on a much smaller sample (only around 200 mill rows) to get a feel for the time: 239 minutes

01

I then edited my conf-file with the following settings using PgTune:

max_connections = 20
shared_buffers = 512MB
effective_cache_size = 24GB
work_mem = 275251kB
maintenance_work_mem = 2097151kB
checkpoint_segments = 128
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 500

This reduced the time-taken to only 184 minutes

02

Very poor, however, the execution was just on one-core with no fancy MapReduce speed-ups.

I have recently discovered agg, which claims to offer just that: “Parallel aggregations for PostgreSQL”, and will post back with results.

Step C: Google BigQuery

Based on Dremel this service seemed exactly what I needed to speed-up the aggregation.

03

I recreated the same sample as in my two tests above (curiously uploading to Google Cloud and then appending to the table took about as long as pushing to the PostgreSQL database) and ran the same query.

It was executed in 19 seconds. Just a bit faster than my local 11,040 second (184 minutes) analysis.

04

Appendix: Python Script

import os
import pandas as pd
import psycopg2
import sys
import time
from subprocess import call
from sqlalchemy import create_engine

#Setup GLOBALS
path_2_csv = ".../"
csv_results = ".../.csv"
mytable = "..."
mydatabase = "..."
myusername = "postgres"
mypassword = "password"


def call_robocopy(from_folder='',
                  to_folder=path_2_csv,
                  my_log='H:/robocopy_log.txt'):
    """
    Copy files to working directory
    robocopy <Source> <Destination> [<File>[ ...]] [<Options>]
    We want to copy the files to a fast SSD drive
    """
    if os.path.isdir(from_folder) & \
            os.path.isdir(to_folder):
        call(["robocopy", from_folder, to_folder, "/LOG:%s" % my_log])
    else:
        print("Paths not entered correctly")

def pandas_temp_table(path_2_csv='',
                      tmp_table='',):
    """
    Upload data to a temporary table first using PANDAs to identify optimal data-types for columns
    PANDAS is not speed-efficient as it uses INSERT commands rather than COPY e.g. it took COPY 16mins average
    to get a 15GB CSV into the database (door-to-door) whereas pandas.to_sql took 50mins
    """
    # Pandas can use sqlalchemy engine
    engine = create_engine('postgresql://%s:%s@localhost:5432/%s' %(myusername, mypassword, mydatabase))
    if engine:
        print('Connected: %s' % engine)
    else:
        print('Connection lost')
        sys.exit(1)

    tmp_table += '_temp'
    counter = 0
    start_time = time.time()
    for i in os.listdir(path_2_csv):
        # Cycle through all CSVs and upload a small chunk to make sure everything is OK
        if counter < 1:
            if i.endswith(".csv") & i.startswith("..."):
                print("Reading CSV: %s into PANDAs data-frame" % i)

                # First 1,000,000 rows
                #df = pd.read_csv(os.path.join(path_2_csv, i), nrows=1000000, header=None, sep='~') #sep=None; automatically find by sniffing

                # Upload whole file
                df = pd.read_csv(os.path.join(path_2_csv, i), nrows=20000000, header=None, sep='~', parse_dates=[2, 3]) #sep=None; automatically find by sniffing
				# My dates were in columns 2 and 3
				# The column names were not present in the original CSVs
                df.columns = [
                            '...'
                        ]
                print("CSV read-in successfully")
                print(df.shape)
                print("Uploading %s to SQL Table: %s" % (i, tmp_table))
                df.to_sql(tmp_table, engine, if_exists='append', index=False)
                counter += 1
                current_speed = ((time.time()-start_time)/60)/counter
                print("Average speed is %.2f minutes per database" % current_speed)
                print("Successfully uploaded: %d" % counter)

    end_time = time.time()
    print("Total duration of INSERT: %.2f minutes" % (end_time - start_time)/60)

def create_postgresql_table(my_table=''):
    """
    Create table copying the structure of the temp table created using pandas
    Timer to benchmark
    """
    # Connect
    con = psycopg2.connect(database=mydatabase, user=myusername, password=mypassword)
    cur = con.cursor()
    if con:
        print('Connected: %s' % con)
    else:
        print('Connection lost')
        sys.exit(1)

    try:
        # Check if table exists already
        cur.execute("""
                    SELECT relname FROM pg_class WHERE relname = '{0}';
                    """.format(my_table))
        table_test = cur.fetchone()[0]
    except Exception as e:
        print('Table %s does not exist' % my_table)
        table_test = None

    if table_test:
        print('%s already exists' % mytable)
    else:
        print('Creating table: %s' % mytable)
        try:
            # Copy structure and no data (1=2 is false)
            cur.execute("""
                        CREATE TABLE {0} AS SELECT * FROM {1} WHERE 1=2;
                        """.format(my_table, my_table+'_temp'))
            con.commit()
            print('Table created successfully')
        except psycopg2.DatabaseError as e:
            if con:
                con.rollback()
            print('Error %s' % e)
            sys.exit(1)
    con.close()


def copy_csv_to_table(path_2_csv='',
                      my_table=''):
    """
    Use the PostgreSQL COPY command to bulk-copy the CSVs into the newly created table
    """
    # Connect
    con = psycopg2.connect(database=mydatabase, user=myusername, password=mypassword)
    cur = con.cursor()
    if con:
        print('Connected: %s' % con)
    else:
        print('Connection lost')
        sys.exit(1)

    copy_sql = """
               COPY %s FROM stdin DELIMITERS '~' CSV;
               """ % my_table
    counter = 0
    start_time = time.time()

    for i in os.listdir(path_2_csv):
        if i.endswith(".csv") and i.startswith("..."):
            print("Uploading %s to %s" % (i, mytable))
            with open(os.path.join(path_2_csv, i), 'r') as f:
                cur.copy_expert(sql=copy_sql, file=f)
                con.commit()
                counter += 1
                print("Successfully uploaded %d CSVs" % counter)
                current_speed = ((time.time()-start_time)/60)/counter
                print("Average speed is %.2f minutes per database" % current_speed)
    con.close()
    end_time = time.time()
    print("Total duration of COPY: %.2f minutes" % ((end_time - start_time)/60))


def sql_query_to_csv(my_table='',
                     csv_out=''):
    """
    Submit query to created PostgreSQL database and output results to a CSV
    """
    # Connect
    con = psycopg2.connect(database=mydatabase, user=myusername, password=mypassword)
    cur = con.cursor()
    if con:
        print('Connected: %s' % con)
    else:
        print('Connection lost')
        sys.exit(1)


    my_query = """
                SELECT
                    SUM("..."),
                    SUM("..."),
                    SUM("..."),
                    COUNT(1) AS "...",
                    EXTRACT(YEAR FROM "...") AS "YEAR",
                    EXTRACT(MONTH FROM "...") AS "MONTH",
                    "...",
                FROM {0}
                GROUP BY
                    "...",
                """.format(my_table)
    start_time = time.time()
    print(start_time)
    output_query = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(my_query)
    with open(csv_out, 'w') as f:
        cur.copy_expert(output_query, f)
        print("Successfully submitted results to: %s" % csv_out)
    con.close()
    print("Total duration of Query: %.2f minutes" % ((time.time() - start_time)/60))