Living life 12 million audit records a day
Author
Norbert Takรกcs
Since we are managing the authentication/access platform for all of Scandinavia (๐ธ๐ช๐ฉ๐ฐ๐ซ๐ฎ๐ณ๐ด), we have the duty of managing the security and auditing of every action that happens within our platform.
You wouldn't want your account accessed by an unauthorised third party, and if it were to happen for any reason, you would for sure want to be able to track down who and how they did it.
Audit cookie trail
Every login, logout, token refresh, client access has to have a field note style paper trail. Or in our case a digital trail. These events are not tied together but can help reconstruct past events based on timestamps and userId-s. There is a requirement to keep audit logs from legal also.
We average 12 million log events a day, the data is simple in nature (few key values, 400 characters) but there is a lot of it. Thankfully no trees are harmed in the process of saving our audits. At least not directly.
Our previous solution was storing our audit data in the managed DynamoDB database, with their sub 10ms blazingly fast response time. It was a hands off solution, but not cheap by any means, since the data we didn't access often.
The mission was clear, come up with a new solution while:
- reduce the money churn
- keep the simplicity of our data access
- relatively fast access
Traditional databases were out of the question, we have had bad experiences in the past managing them within our k8s environment. Also managed RDS instances don't come cheap, nor are they are for storing infrequently accessed cold data. There are alternatives built on top of Postgres which would work also, as TimescaleDB, and RavenDB I was told by a few eager LinkedIn sales people pitching their products.
Just put it in the bucket ๐ชฃ
The solution which we thought out was simple, use one of Jeff-s simplest storage solutions, S3 buckets and save the events in smaller JSON chunks.
Then for access we would use AWS glue which would crawl the S3 folder files, index them and store their information in a Data Catalog (fancy metadata repository). Querying will be done with AWS Athena with their own flavour of SQL language.
Few weeks of trial and error in our testing environment and we came up with the structure of the data set which allowed for better indexing and querying using Athena.
1๐ year=2025/2โโโ ๐ month=02/3โ โโโ ๐ date=24/4โ โ โโโ 00-00.json5โ โ โโโ 01-00.json6โ โ โโโ 02-00.json7
Year, month, date prefixes. Then save the logs each hour as they are coming in. Amazing!
Drop the table, save the money!
The old data got converted from DynamoDB, the table got dropped, big cost saving was achieved! ($$$$)
After the solution was fitted into production the problem started creeping up. As long we were querying data for a single month the speed was acceptable at around 8-10 seconds. Not the best, but acceptable for infrequently accessed data!
Unfortunately the accessing pattern dictated last 30 days from current date where the loading went easily above 25 seconds. It took so long that even the developers working on it started questioning if the query has hanged and would trigger multiple searches or even avoid using it at all cost and query it from the AWS console.
Since the money was already saved it was out of the question to revisit the old solution. We spent weeks tweaking the solution trying to come up with a way to speed it up.
The problem seemed to be two fold,
- indexing worked for year and month, but not for days
- too much data was scanned upon querying
Parquet, CSV on steroids
I came across on my daily online browsing the parquet file format which was recommended for similar big data applications. Originally developed for Apache Hadoop.
It packs:
- great compression out of the box
- similar to CSV file format in structure
- columnar storage and excels at reading specific columns
Compared to our JSON chunks we would skip saving all the repeating key names and would not need to parse the data!
It seemed like too good to be true. The only drawback being that we cant directly read the written files and would require extra software to browse them.
Laying Parquet!
We converted our JSON chunks to 64mb parquet files using a nightly cronjob. Instantly experienced a 3x speed up upon reindexing. The queries were fast enough to satisfy our requirements.
Stats
Here are some stats before and after the conversion:
Storage: 25% space used reduction
Query speed: 3x speed up (down from 24 seconds to acceptable 8 seconds)
Data scanned: whopping 93% reduction!
The S3 bucket with AWS glue table combined with the parquet file format has really saved us in the pinch. In addition we could have explored alternative file formats as orc, avro, delta. Doubling the chunk sizes can also affect the read speed. Comment under if you have an opinion on these!
It's such a great feeling when a simple solution gives expected great results. There are some drawbacks to, the current state of Parquetjs support on node is very limited. Writing these files with the library is also somewhat tricky.
For example:
- once files are written they cant be appended to
- a single instance of the writer has to be passed around
- there is no inbuilt file size measurement.
- cant write to memory
If you are feeling so inclined do fork and contribute to it, humanity would benefit from it!
In the end an actionable item. I am attaching a script to be used when processing files from JSON to Parquet
1import 'dotenv/config';2import fs from 'fs';3import AWS from 'aws-sdk';4import parquet from 'parquetjs';56const chunkSizeMB = 64;7const chunkSizeBytes = chunkSizeMB * 1024 * 1024; // Convert MB to bytes89const sourceBucket = process.env.S3_PARQUET_SOURCE_BUCKET ? process.env.S3_PARQUET_SOURCE_BUCKET : 'no-bucket';10const destinationBucket = process.env.S3_PARQUET_DESTINATION_BUCKET11 ? process.env.S3_PARQUET_DESTINATION_BUCKET12 : 'no-bucket';1314const defaultConfig = {15 region: 'eu-west-1',16};1718const s3Config = defaultConfig;1920const s3 = new AWS.S3(s3Config);2122async function listAllKeys(bucket: string, prefix = ''): Promise<string[]> {23 const keys: string[] = [];24 let continuationToken: string | undefined;2526 do {27 const result = await s328 .listObjectsV2({29 Bucket: bucket,30 Prefix: prefix,31 MaxKeys: 1000,32 ContinuationToken: continuationToken,33 })34 .promise();3536 result.Contents?.forEach((object) => {37 if (object.Key) keys.push(object.Key);38 });3940 console.log('[batch-loading-keys]: ' + result.Contents[result.Contents.length - 1].Key);41 continuationToken = result.NextContinuationToken;42 } while (continuationToken);4344 return keys;45}4647async function copyBucketWithTransformation(48 sourceBucket: string,49 destinationBucket: string,50 prefix = '',51): Promise<void> {52 try {53 const keys = await listAllKeys(sourceBucket, prefix);54 const schema = new parquet.ParquetSchema({55 id: { type: 'UTF8' },56 action: { type: 'UTF8', optional: true },57 at: { type: 'TIMESTAMP_MILLIS' },58 status_code: { type: 'INT32', optional: true },59 });6061 let newKeyName = keys[0].replace('json', 'parquet');62 let oldKeyDate = getKeyDate(keys[0]);6364 let newFileName = `/tmp/${getKeyFileName(newKeyName)}`;65 let currentFileSize = await getFileSize(newFileName);66 console.log('[initial newWriter] with ' + newFileName);6768 let writer = await parquet.ParquetWriter.openFile(schema, newFileName);69 writer.setRowGroupSize(1000);70 for (let index = 0; index < keys.length; index++) {71 const key = keys[index];72 const object = await s373 .getObject({74 Bucket: sourceBucket,75 Key: key,76 })77 .promise();78 const items = Buffer.from(object.Body as Buffer)79 .toString('utf8')80 .split(/\r?\n/);8182 console.log('[appended file] ' + getKeyFileName(key));83 for (let index = 0; index < items.length; index++) {84 const item = items[index];85 if (item) {86 const parsed = JSON.parse(item);87 parsed.at = parsed.at * 1000;88 await writer.appendRow(parsed);89 }90 }9192 const keyDate = getKeyDate(keys[index]);93 // if new day, or byte size is over 64mb, upload94 currentFileSize = await getFileSize(newFileName);95 if (keyDate !== oldKeyDate || currentFileSize > chunkSizeBytes || index === keys.length - 1) {96 // upload previous file97 console.log('[closing writer] ' + getKeyFileName(newKeyName));98 await writer.close();99100 const transformedData = await readFileContentAsBytes(newFileName);101 await s3102 .putObject({103 Bucket: destinationBucket,104 Key: newKeyName,105 Body: transformedData,106 })107 .promise();108109 console.log(`[uploaded file] ${getKeyFileName(newKeyName)} size: ${currentFileSize}`);110 // remove the file111 fs.unlinkSync(newFileName);112 console.log(`[deleted file] ${getKeyFileName(newKeyName)}`);113114 if (index !== keys.length - 1) {115 // start a new file116 newFileName = `/tmp/${getKeyFileName(keys[index + 1])}`;117 newKeyName = keys[index + 1].replace('json', 'parquet');118119 writer = await parquet.ParquetWriter.openFile(schema, newFileName);120 writer.setRowGroupSize(1000);121 console.log('-----------------------------------------');122123 console.log('[new-writer] ' + newFileName);124 }125 }126 oldKeyDate = getKeyDate(key);127 }128129 console.log(`[finish] Current file batch ${prefix} from ${sourceBucket} to ${destinationBucket}`);130 } catch (error) {131 console.error('Error copying bucket:', error);132 }133}134135// Function to generate all valid prefixes between two dates136const generatePrefixes = (startDate: string, endDate: string) => {137 const prefixes = [];138 let currentDate = new Date(startDate);139140 const end = new Date(endDate);141142 while (currentDate <= end) {143 const year = currentDate.getFullYear();144 const month = String(currentDate.getMonth() + 1).padStart(2, '0');145 const day = String(currentDate.getDate()).padStart(2, '0');146147 prefixes.push(`year=${year}/month=${month}/date=${day}/`);148149 // Move to the next day150 currentDate.setDate(currentDate.getDate() + 1);151 }152 return prefixes;153};154155const getPreviousDayPrefix = () => {156 const date = new Date();157 date.setDate(date.getDate() - 1); // Move to the previous day158159 const year = date.getFullYear();160 const month = String(date.getMonth() + 1).padStart(2, '0');161 const day = String(date.getDate()).padStart(2, '0');162163 return `year=${year}/month=${month}/date=${day}/`;164};165166const main = async () => {167 let datePrefixes = [getPreviousDayPrefix()];168169 const prefixFromEnv = process.env.DATE_TO_PROCESS;170171 // takes date prefixes form env in this format DATE_TO_PROCESS=2025-01-25/2025-01-29172 if (prefixFromEnv) {173 const [startDate, endDate] = process.env.DATE_TO_PROCESS.split('/');174 datePrefixes = generatePrefixes(startDate, endDate);175 }176177 for (const prefix of datePrefixes) {178 try {179 console.log(`Processing prefix: ${prefix}`);180 await copyBucketWithTransformation(sourceBucket, destinationBucket, prefix);181 } catch (error) {182 console.error(`Failed to process prefix: ${prefix}`, error);183 }184 }185};186187async function getFileSize(filePath: string): Promise<number> {188 return new Promise((resolve) => {189 fs.stat(filePath, (error, stats) => {190 if (error) {191 if (error.code === 'ENOENT') {192 resolve(0); // File doesn't exist193 } else {194 throw error; // Other errors, such as permission issues195 }196 } else {197 resolve(stats.size); // File exists, return its size198 }199 });200 });201}202const getKeyDate = (key: string): string => {203 const keyChunks = key.split('/');204 const lastPart = keyChunks.pop();205 const keyPrefix = keyChunks.join('/');206 return keyPrefix;207};208209const getKeyFileName = (key: string): string => {210 const keyChunks = key.split('/');211 const lastPart = keyChunks.pop();212 return lastPart;213};214215async function readFileContentAsBytes(filePath: string) {216 return new Promise((resolve, reject) => {217 fs.readFile(filePath, (error, data) => {218 if (error) {219 if (error.code === 'ENOENT') {220 resolve(null); // File doesn't exist, return null221 } else {222 reject(error); // Other errors, reject with the error223 }224 } else {225 resolve(data); // Return the content as a Buffer (raw bytes)226 }227 });228 });229}230231main();232