AWS IoT Analytics for CSV Importer
Creating examples for CSV Importer with AWS IoT Analytics. Data sender by trigger of Amazon S3 and Pipeline lambda python scripts.
AWS IoT Analytics is a solution for IoT industry. But in this time, I will try to use it for creating CSV importer. The merits of creating it with IoT Analytics are below.
- On Channel, we can store data received as backup automatically. And with the data, we can re-process Pipeline process. (e.g. It is enable to process data stored in Channel after replacing Pipeline process.)
- Can confirm which data is processed on Pipeline in Data store stage. Also can use those data for another heavy process (like AWS Batch process for data analysis)
TL;DR;
The result of this page is below.
https://github.com/kojiisd/aws-iot-csv-importer
2. Lambda Programming after trigger
After putting data to S3, following program will be started.
def convert_from_csv_to_json(file_path, bucket_name, header=False):
df = pd.read_csv(file_path)
tmp_json ={}
if header:
tmp_json = df.to_json(orient='records')
else:
tmp_json = df.to_json(orient='values')
result_json_array = []
for ele_json in json.loads(tmp_json):
logger.info(ele_json)
result_json = {}
result_tmp_json = {}
result_tmp_json['s3_bucket'] = bucket_name
result_tmp_json['data'] = ele_json
result_json['messageId'] = str(ele_json['account_number'])
result_json['payload'] = json.dumps(result_tmp_json)
result_json_array.append(result_json)
return result_json_array
def send_data_lambda(event, context):
bucket_str = event['Records'][0]['s3']['bucket']['name']
bucket = s3_client.Bucket(bucket_str)
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
file_path = TMP_PATH + key
os.makedirs(os.path.dirname(file_path), exist_ok=True)
bucket.download_file(key, file_path)
json_array = convert_from_csv_to_json(file_path, bucket_str, True)
for json_sub in chunked(json_array, DATA_BATCH_SIZE):
response = iot_analytics_client.batch_put_message(
channelName = 'csv_import_sample_channel',
messages = json_sub
)
return 'All data sending finished.'
Rough coding for this process but you can send CSV data to IoT Analytics with this program. In this time, I use Elasticsearch sample data “accounts.csv”. The data contents are below.
account_number,address,age,balance,city,email,employer,firstname,gender,lastname,state
1,880 Holmes Lane,32,39225,Brogan,amberduke@pyrami.com,Pyrami,Amber,M,Duke,IL
6,671 Bristol Street,36,5686,Dante,hattiebond@netagy.com,Netagy,Hattie,M,Bond,TN
13,789 Madison Street,28,32838,Nogal,nanettebates@quility.com,Quility,Nanette,F,Bates,VA
18,467 Hutchinson Court,33,4180,Orick,daleadams@boink.com,Boink,Dale,M,Adams,MD
I will submit this data to IoT Analytics, but when submitting the data one by one, the number of execution for Pipeline will become too much. So I will use Batch Size to execute Stream process.
For this setting, 100 data will be compiled in one data and sent to Pipeline.
3. Lambda programming executed in Pipeline
After data sending to Pipeline, following Lambda code will be executed.
def store_data_lambda(event, context):
logger.info("Start store data: {}".format(event))
bucket = s3_client.Bucket(event[0]['s3_bucket'])
key = CONF_PATH
file_path = TMP_PATH + key
os.makedirs(os.path.dirname(file_path), exist_ok=True)
bucket.download_file(key, file_path)
conf_file = open(file_path, 'r')
conf_json = json.load(conf_file)
logger.info(conf_json)
result_json_array = []
tmp_json = {}
for ev in event:
tmp_json = {}
for key, value in ev['data'].items():
if key in conf_json.keys():
tmp_json[conf_json[key]] = value
result_json_array.append(tmp_json)
ddb_table = ddb_client.Table(DYNAMODB_TABLE)
with ddb_table.batch_writer() as batch:
for item_json in result_json_array:
batch.put_item(
Item=item_json
)
return result_json_array
For this code, results of processing will be stored to DynamoDB and they will be stored to Time series data store of IoT Analytics too.
4. The results of execution
For the testing, put a CVS file to S3 and executed these processes. In Dataset part of IoT Analytics, I could confirm results. (Of course the results could be stored in DynamoDB)
In DynamoDB, I could confirm the results.
5. Re-processing of Pipeline
On AWS Console, I can re-process easily.
Conclusion
Actually this usage is not expected as IoT Analytics usage but I could make a one whole process for CSV importing easily.