Payload Logo
Friday

Living life 12 million audit records a day

Author

Norbert Takรกcs

Since we are managing the authentication/access platform for all of Scandinavia (๐Ÿ‡ธ๐Ÿ‡ช๐Ÿ‡ฉ๐Ÿ‡ฐ๐Ÿ‡ซ๐Ÿ‡ฎ๐Ÿ‡ณ๐Ÿ‡ด), we have the duty of managing the security and auditing of every action that happens within our platform.

You wouldn't want your account accessed by an unauthorised third party, and if it were to happen for any reason, you would for sure want to be able to track down who and how they did it.

Audit cookie trail

Every login, logout, token refresh, client access has to have a field note style paper trail. Or in our case a digital trail. These events are not tied together but can help reconstruct past events based on timestamps and userId-s. There is a requirement to keep audit logs from legal also.

We average 12 million log events a day, the data is simple in nature (few key values, 400 characters) but there is a lot of it. Thankfully no trees are harmed in the process of saving our audits. At least not directly.

Our previous solution was storing our audit data in the managed DynamoDB database, with their sub 10ms blazingly fast response time. It was a hands off solution, but not cheap by any means, since the data we didn't access often.


The mission was clear, come up with a new solution while:

  • reduce the money churn
  • keep the simplicity of our data access
  • relatively fast access


Traditional databases were out of the question, we have had bad experiences in the past managing them within our k8s environment. Also managed RDS instances don't come cheap, nor are they are for storing infrequently accessed cold data. There are alternatives built on top of Postgres which would work also, as TimescaleDB, and RavenDB I was told by a few eager LinkedIn sales people pitching their products.

Just put it in the bucket ๐Ÿชฃ

The solution which we thought out was simple, use one of Jeff-s simplest storage solutions, S3 buckets and save the events in smaller JSON chunks.

Then for access we would use AWS glue which would crawl the S3 folder files, index them and store their information in a Data Catalog (fancy metadata repository). Querying will be done with AWS Athena with their own flavour of SQL language.


Few weeks of trial and error in our testing environment and we came up with the structure of the data set which allowed for better indexing and querying using Athena.

1๐Ÿ“ year=2025/
2โ”‚โ”€โ”€ ๐Ÿ“ month=02/
3โ”‚ โ”‚โ”€โ”€ ๐Ÿ“ date=24/
4โ”‚ โ”‚ โ”‚โ”€โ”€ 00-00.json
5โ”‚ โ”‚ โ”‚โ”€โ”€ 01-00.json
6โ”‚ โ”‚ โ””โ”€โ”€ 02-00.json
7

Year, month, date prefixes. Then save the logs each hour as they are coming in. Amazing!


Drop the table, save the money!

The old data got converted from DynamoDB, the table got dropped, big cost saving was achieved! ($$$$)

After the solution was fitted into production the problem started creeping up. As long we were querying data for a single month the speed was acceptable at around 8-10 seconds. Not the best, but acceptable for infrequently accessed data!

Unfortunately the accessing pattern dictated last 30 days from current date where the loading went easily above 25 seconds. It took so long that even the developers working on it started questioning if the query has hanged and would trigger multiple searches or even avoid using it at all cost and query it from the AWS console.


Since the money was already saved it was out of the question to revisit the old solution. We spent weeks tweaking the solution trying to come up with a way to speed it up.


The problem seemed to be two fold,

  • indexing worked for year and month, but not for days
  • too much data was scanned upon querying

Parquet, CSV on steroids

I came across on my daily online browsing the parquet file format which was recommended for similar big data applications. Originally developed for Apache Hadoop.

It packs:

  • great compression out of the box
  • similar to CSV file format in structure
  • columnar storage and excels at reading specific columns

Compared to our JSON chunks we would skip saving all the repeating key names and would not need to parse the data!

It seemed like too good to be true. The only drawback being that we cant directly read the written files and would require extra software to browse them.


Laying Parquet!

We converted our JSON chunks to 64mb parquet files using a nightly cronjob. Instantly experienced a 3x speed up upon reindexing. The queries were fast enough to satisfy our requirements.

Stats

Here are some stats before and after the conversion:

Storage: 25% space used reduction

Query speed: 3x speed up (down from 24 seconds to acceptable 8 seconds)

Data scanned: whopping 93% reduction!


The S3 bucket with AWS glue table combined with the parquet file format has really saved us in the pinch. In addition we could have explored alternative file formats as orc, avro, delta. Doubling the chunk sizes can also affect the read speed. Comment under if you have an opinion on these!


It's such a great feeling when a simple solution gives expected great results. There are some drawbacks to, the current state of Parquetjs support on node is very limited. Writing these files with the library is also somewhat tricky.

For example:

  • once files are written they cant be appended to
  • a single instance of the writer has to be passed around
  • there is no inbuilt file size measurement.
  • cant write to memory


If you are feeling so inclined do fork and contribute to it, humanity would benefit from it!


In the end an actionable item. I am attaching a script to be used when processing files from JSON to Parquet

1import 'dotenv/config';
2import fs from 'fs';
3import AWS from 'aws-sdk';
4import parquet from 'parquetjs';
5
6const chunkSizeMB = 64;
7const chunkSizeBytes = chunkSizeMB * 1024 * 1024; // Convert MB to bytes
8
9const sourceBucket = process.env.S3_PARQUET_SOURCE_BUCKET ? process.env.S3_PARQUET_SOURCE_BUCKET : 'no-bucket';
10const destinationBucket = process.env.S3_PARQUET_DESTINATION_BUCKET
11 ? process.env.S3_PARQUET_DESTINATION_BUCKET
12 : 'no-bucket';
13
14const defaultConfig = {
15 region: 'eu-west-1',
16};
17
18const s3Config = defaultConfig;
19
20const s3 = new AWS.S3(s3Config);
21
22async function listAllKeys(bucket: string, prefix = ''): Promise<string[]> {
23 const keys: string[] = [];
24 let continuationToken: string | undefined;
25
26 do {
27 const result = await s3
28 .listObjectsV2({
29 Bucket: bucket,
30 Prefix: prefix,
31 MaxKeys: 1000,
32 ContinuationToken: continuationToken,
33 })
34 .promise();
35
36 result.Contents?.forEach((object) => {
37 if (object.Key) keys.push(object.Key);
38 });
39
40 console.log('[batch-loading-keys]: ' + result.Contents[result.Contents.length - 1].Key);
41 continuationToken = result.NextContinuationToken;
42 } while (continuationToken);
43
44 return keys;
45}
46
47async function copyBucketWithTransformation(
48 sourceBucket: string,
49 destinationBucket: string,
50 prefix = '',
51): Promise<void> {
52 try {
53 const keys = await listAllKeys(sourceBucket, prefix);
54 const schema = new parquet.ParquetSchema({
55 id: { type: 'UTF8' },
56 action: { type: 'UTF8', optional: true },
57 at: { type: 'TIMESTAMP_MILLIS' },
58 status_code: { type: 'INT32', optional: true },
59 });
60
61 let newKeyName = keys[0].replace('json', 'parquet');
62 let oldKeyDate = getKeyDate(keys[0]);
63
64 let newFileName = `/tmp/${getKeyFileName(newKeyName)}`;
65 let currentFileSize = await getFileSize(newFileName);
66 console.log('[initial newWriter] with ' + newFileName);
67
68 let writer = await parquet.ParquetWriter.openFile(schema, newFileName);
69 writer.setRowGroupSize(1000);
70 for (let index = 0; index < keys.length; index++) {
71 const key = keys[index];
72 const object = await s3
73 .getObject({
74 Bucket: sourceBucket,
75 Key: key,
76 })
77 .promise();
78 const items = Buffer.from(object.Body as Buffer)
79 .toString('utf8')
80 .split(/\r?\n/);
81
82 console.log('[appended file] ' + getKeyFileName(key));
83 for (let index = 0; index < items.length; index++) {
84 const item = items[index];
85 if (item) {
86 const parsed = JSON.parse(item);
87 parsed.at = parsed.at * 1000;
88 await writer.appendRow(parsed);
89 }
90 }
91
92 const keyDate = getKeyDate(keys[index]);
93 // if new day, or byte size is over 64mb, upload
94 currentFileSize = await getFileSize(newFileName);
95 if (keyDate !== oldKeyDate || currentFileSize > chunkSizeBytes || index === keys.length - 1) {
96 // upload previous file
97 console.log('[closing writer] ' + getKeyFileName(newKeyName));
98 await writer.close();
99
100 const transformedData = await readFileContentAsBytes(newFileName);
101 await s3
102 .putObject({
103 Bucket: destinationBucket,
104 Key: newKeyName,
105 Body: transformedData,
106 })
107 .promise();
108
109 console.log(`[uploaded file] ${getKeyFileName(newKeyName)} size: ${currentFileSize}`);
110 // remove the file
111 fs.unlinkSync(newFileName);
112 console.log(`[deleted file] ${getKeyFileName(newKeyName)}`);
113
114 if (index !== keys.length - 1) {
115 // start a new file
116 newFileName = `/tmp/${getKeyFileName(keys[index + 1])}`;
117 newKeyName = keys[index + 1].replace('json', 'parquet');
118
119 writer = await parquet.ParquetWriter.openFile(schema, newFileName);
120 writer.setRowGroupSize(1000);
121 console.log('-----------------------------------------');
122
123 console.log('[new-writer] ' + newFileName);
124 }
125 }
126 oldKeyDate = getKeyDate(key);
127 }
128
129 console.log(`[finish] Current file batch ${prefix} from ${sourceBucket} to ${destinationBucket}`);
130 } catch (error) {
131 console.error('Error copying bucket:', error);
132 }
133}
134
135// Function to generate all valid prefixes between two dates
136const generatePrefixes = (startDate: string, endDate: string) => {
137 const prefixes = [];
138 let currentDate = new Date(startDate);
139
140 const end = new Date(endDate);
141
142 while (currentDate <= end) {
143 const year = currentDate.getFullYear();
144 const month = String(currentDate.getMonth() + 1).padStart(2, '0');
145 const day = String(currentDate.getDate()).padStart(2, '0');
146
147 prefixes.push(`year=${year}/month=${month}/date=${day}/`);
148
149 // Move to the next day
150 currentDate.setDate(currentDate.getDate() + 1);
151 }
152 return prefixes;
153};
154
155const getPreviousDayPrefix = () => {
156 const date = new Date();
157 date.setDate(date.getDate() - 1); // Move to the previous day
158
159 const year = date.getFullYear();
160 const month = String(date.getMonth() + 1).padStart(2, '0');
161 const day = String(date.getDate()).padStart(2, '0');
162
163 return `year=${year}/month=${month}/date=${day}/`;
164};
165
166const main = async () => {
167 let datePrefixes = [getPreviousDayPrefix()];
168
169 const prefixFromEnv = process.env.DATE_TO_PROCESS;
170
171 // takes date prefixes form env in this format DATE_TO_PROCESS=2025-01-25/2025-01-29
172 if (prefixFromEnv) {
173 const [startDate, endDate] = process.env.DATE_TO_PROCESS.split('/');
174 datePrefixes = generatePrefixes(startDate, endDate);
175 }
176
177 for (const prefix of datePrefixes) {
178 try {
179 console.log(`Processing prefix: ${prefix}`);
180 await copyBucketWithTransformation(sourceBucket, destinationBucket, prefix);
181 } catch (error) {
182 console.error(`Failed to process prefix: ${prefix}`, error);
183 }
184 }
185};
186
187async function getFileSize(filePath: string): Promise<number> {
188 return new Promise((resolve) => {
189 fs.stat(filePath, (error, stats) => {
190 if (error) {
191 if (error.code === 'ENOENT') {
192 resolve(0); // File doesn't exist
193 } else {
194 throw error; // Other errors, such as permission issues
195 }
196 } else {
197 resolve(stats.size); // File exists, return its size
198 }
199 });
200 });
201}
202const getKeyDate = (key: string): string => {
203 const keyChunks = key.split('/');
204 const lastPart = keyChunks.pop();
205 const keyPrefix = keyChunks.join('/');
206 return keyPrefix;
207};
208
209const getKeyFileName = (key: string): string => {
210 const keyChunks = key.split('/');
211 const lastPart = keyChunks.pop();
212 return lastPart;
213};
214
215async function readFileContentAsBytes(filePath: string) {
216 return new Promise((resolve, reject) => {
217 fs.readFile(filePath, (error, data) => {
218 if (error) {
219 if (error.code === 'ENOENT') {
220 resolve(null); // File doesn't exist, return null
221 } else {
222 reject(error); // Other errors, reject with the error
223 }
224 } else {
225 resolve(data); // Return the content as a Buffer (raw bytes)
226 }
227 });
228 });
229}
230
231main();
232
Join the Discussion on github