Warm tip: This article is reproduced from stackoverflow.com, please click
google-bigquery google-cloud-run google-cloud-storage

Denormalize a GCS file before uploading to BigQuery

发布于 2020-04-03 23:38:36

I have written a Cloud Run API in .Net Core that reads files from a GCS location and then is supposed to denormalize (i.e. add more information for each row to include textual descriptions) and then write that to a BigQuery table. I have two options:

  1. My cloud run API could create denormalized CSV files and write them to another GCS location. Then another cloud run API could pick up those denormalized CSV files and write them straight to BigQuery.
  2. My cloud run API could read the original CSV file, denormalize them in memory (filestream) and then somehow write from the in memory filestream straight to the BigQuery table.

What is the best way to write to BigQuery in this scenario if performance (speed) and cost (monetary) is my goal. These files are roughly 10KB each before denormalizing. Each row is roughly 1000 characters. After denormalizing it is about three times as much. I do not need to keep denormalized files after they are successfully loaded in BigQuery. I am concerned about performance, as well as any specific BigQuery daily quotas around inserts/writes. I don't think there are any unless you are doing DML statements but correct me if I'm wrong.

Questioner
AIK DO
Viewed
70
Juancki 2020-01-31 20:05

I would use Cloud Functions that are triggered when you upload a file to a bucket.

It is so common that Google has a repo a tutorial just for this for JSON files Streaming data from Cloud Storage into BigQuery using Cloud Functions.

Then, I would modify the example main.py file from:

def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']
    db_ref = DB.document(u'streaming_files/%s' % file_name)
    if _was_already_ingested(db_ref):
        _handle_duplication(db_ref)
    else:
        try:
            _insert_into_bigquery(bucket_name, file_name)
            _handle_success(db_ref)
        except Exception:
            _handle_error(db_ref)

To this that accepts CSV files:

import json
import csv
import logging
import os
import traceback
from datetime import datetime

from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage
import pytz



PROJECT_ID = os.getenv('GCP_PROJECT')
BQ_DATASET = 'fromCloudFunction'
BQ_TABLE = 'mytable'

CS = storage.Client()
BQ = bigquery.Client()


def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']

    newRows = postProcessing(bucket_name, file_name)

    # It is recommended that you save 
    # what you process for debugging reasons.
    destination_bucket = 'post-processed' # gs://post-processed/
    destination_name = file_name
    # saveRowsToBucket(newRows,destination_bucket,destination_name)
    rowsInsertIntoBigquery(newRows)



class BigQueryError(Exception):
    '''Exception raised whenever a BigQuery error happened''' 

    def __init__(self, errors):
        super().__init__(self._format(errors))
        self.errors = errors

    def _format(self, errors):
        err = []
        for error in errors:
            err.extend(error['errors'])
        return json.dumps(err)

def postProcessing(bucket_name, file_name):
    blob = CS.get_bucket(bucket_name).blob(file_name)
    my_str = blob.download_as_string().decode('utf-8')
    csv_reader = csv.DictReader(my_str.split('\n'))                                                                   
    newRows = []
    for row in csv_reader:
        modified_row = row # Add your logic
        newRows.append(modified_row)
    return newRows

def rowsInsertIntoBigquery(rows):
    table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
    errors = BQ.insert_rows_json(table,rows)
    if errors != []:
        raise BigQueryError(errors)

It would be still necesssary to define your map(row->newRow) and the function saveRowsToBucket if you needed it.