In my last post, I shared how CJA and the new cjar library from Ben Woodard changed the game for analyzing event data in R. However, it’s often not enough to only analyze data – you have to be able to do something with it. In this post, I’m going to show you how to take data that you have in an R data frame, and insert it directly into the Adobe Experience Platform (AEP), so you can do exciting stuff like:
- Filter bots from your event data using sophisticated algorithms
- Cluster your users and analyze them in Analysis Workspace
- Build a propensity model to predict which users are likely to churn or convert
- Insert data like customer lifetime value or other computed attributes from R into the AEP Unified Profile for activation or personalization
To get data from R into AEP, you’ll need to follow these three steps:
- Put your R data into an AEP-compatible schema
- Write the data into the supported JSON format
- Push the data to AEP using the batch ingestion API
Let’s get started!
Getting R Data Into an AEP-Compatible Schema
The first step is converting an R data frame into something AEP can understand. All data in AEP is based on the Experience Data Model (or XDM for short). In general, there are three types of data in AEP: event data (based on the “XDM Experience Event” schema class), profile data (based on the “XDM Individual Profile” schema class), and lookup data (based on the “Record” schema behavior). For the sake of the example here, we’re going to focus on creating a profile dataset as that’s often most useful for clustering, bot filtering, propensity models, or ingestion into AEP’s Unified Profile. Still, you can follow the same basic steps for any AEP data.
Before we go further, a quick note about data scale: working with really huge numbers of records is not doable with R alone, so if you have tens of millions or more records that you’re going to send to AEP, you’ll need to also work with some additional R libraries – my favorite is sparklyr which provides a dplyr interface to Apache Spark. In my examples below, I’ll show you how to work with “normal” sized data and “big” data.
Let’s start with a simple data frame that represents a classification of some of my CJA users (really easy to do if you’re using cjar – use the “adobe_personid” as your dimension):
adobe_personid | propensity_score | some_number | some_text |
ABCD1234 | 0.58 | 100 | red |
WXYZ5678 | 0.21 | 55 | blue |
MNOP91011 | 0.01 | 88 | yellow |
First, we need to create an AEP schema and dataset corresponding to the data above. You can read all about creating schemas in the AEP documentation, which I will leave to the reader – however, in our case, we’ll want an XDM Individual Profile schema that looks something like this (with your tenant id rather than the “_tenantid” shown below):
Notice that I’ve named the fields in my schema to match the column names of my R data frame – naming things the same will make life much easier later. Once I have a schema ready, creating a dataset from this schema is straightforward, and you’ll have a dataset prepared to go in no time.
Writing R Data Into Properly Formatted JSON
The trickiest part about formatting your data is nesting the data in your R data frame under your tenant ID such that we can write an AEP-compatible JSON file to upload. The easiest way to do this is using a few tidyverse functions to nest
each row individually under your tenant ID:
library(tidyverse)
library(jsonlite)
json_file = "profile_data.json"
# Using the data table above named my_profile_data
my_profile_data %>%
group_split(adobe_personid) %>%
map_df(nest) %>%
mutate(data = map(data, ~.x %>% unbox)) %>%
rename(`_mytenantid` = data) %>%
stream_out(con = file(json_file))
The group_split
is there to ensure the nesting operation happens on each row (rather than converting all the rows into one giant array). Using the map_df as well as the map and unbox functions, we get the data nested under a single field while removing extra array “boxes” (e.g. ‘[‘ and ‘]’) using the jsonlite library. Finally, rename the tenant ID to whatever your specific tenant ID is and use the stream_out
function to write the data to a file.
The output should be a single file where each row in your R data frame is a single line of JSON:
{"_mytenantid":{"adobe_personid":"ABCD1234","propensity_score":0.58,"some_number":100,"some_text":"red"}}
{"_mytenantid":{"adobe_personid":"WXYZ5678","propensity_score":0.21,"some_number":55,"some_text":"blue"}}
{"_mytenantid":{"adobe_personid":"MNOP91011","propensity_score":0.01,"some_number":88,"some_text":"yellow"}}
If you’re doing this at scale with the sparklyr library, you can accomplish this with some help from the sparklyr.nested library. After loading your data into a spark data frame using sdf_copy_to
(or just by reading in a file you have, whatever the case may be):
library(sparklyr)
library(sparklyr.nested)
my_data_export = my_profile_sparkdataframe %>%
sdf_nest(adobe_personid, propensity_score, some_number, some_text, .key="_mytenantid")
# To view the schema as a spot check you can:
my_data_export %>% sdf_schema_viewer()
# Depending on the data size, you can partition it to any number
# of files. AEP has a max of 512MB per file
single_partition_export = sdf_repartition(my_data_export, partitions = 1)
# You can export as JSON or Parquet (AEP supports both)
# but Parquet packs a lot more data into a smaller file size
spark_write_json(single_partition_export, "/somedirectory")
This will produce the same output format we had above but for much larger datasets if needed.
Pushing Data Into AEP Using the Batch Ingestion API
The final step is pushing our JSON file into an AEP dataset using the Batch Ingestion API. This API depends on getting set up with an Adobe Console API Project, which I won’t go over here – but Ben Woodard put together an excellent guide that walks you through the basic steps you need to follow in his adobeanalyticsr documentation that is helpful. After following the guide, you should end up with the following stuff:
- API key
- Client secret
- Org ID
- “Technical account ID”
- Private key
- Sandbox (usually “prod”)
Using the jose, httr, and fpeek libraries, you can then use the following function to generate an access token. Note: I’m not following best security practices here with having keys and secrets in my code – but I’ll leave it to you to follow best practices. This is meant only to show you how things are basically done:
library(jose)
library(httr)
library(fpeek)
api_key = "some_api_key"
client_secret = "some_client_secret"
org_id = "SOMEORGID@AdobeOrg"
technical_account_id = "SOMETECHACCOUNTID@techacct.adobe.com"
private_key = read_key("/somedirectory/mc_private.key")
sandbox = "prod"
get_access_token = function(){
expiration = round(as.numeric(Sys.time()) + 24*60*60)
### Get JWT token
claim = jwt_claim(
exp = expiration,
iss = org_id,
sub = technical_account_id,
`https://ims-na1.adobelogin.com/s/ent_dataservices_sdk` = TRUE,
`https://ims-na1.adobelogin.com/s/ent_cja_sdk` = TRUE,
aud = paste0("https://ims-na1.adobelogin.com/c/", api_key)
)
jwt = jwt_encode_sig(claim=claim , key = private_key)
### Authenticate with IMS and get the access token
jwt_token = POST(
url = "https://ims-na1.adobelogin.com/ims/exchange/jwt",
add_headers(
"Content-Type" = "multipart/form-data"
),
body = list(
"client_id" = api_key,
"client_secret" = client_secret,
"jwt_token" = jwt
),
encode = "multipart"
)
access_token = content(jwt_token)$access_token
return(access_token)
}
access_token = get_access_token()
With that function we can now add our JSON file to an AEP dataset. The only additional thing you’ll need is the dataset ID you’ll be adding the JSON file to, and you can find that in the AEP UI where you created the dataset earlier.
The basic steps to adding the JSON file to AEP are:
- Open a new dataset batch
- Write the JSON (or Parquet) file to the batch
- Close the batch
This function does all three in one fell swoop:
add_json_to_batch = function(dataset_id, json_file){
### Open a batch for the data to go into
open_batch_response = POST(
url = "https://platform.adobe.io/data/foundation/import/batches",
add_headers(
`Content-Type` = "application/json",
`x-gw-ims-org-id` = org_id,
`x-sandbox-name` = sandbox,
`Authorization` = paste0("Bearer ", access_token),
`x-api-key` = api_key
),
body = paste0('{"datasetId":"', dataset_id, '", "inputFormat":{"format":"json"}}')
)
batch_id = content(open_batch_response)$id
### Write the file to the batch
write_file_response = PUT(
url = paste0("https://platform.adobe.io/data/foundation/import/batches/", batch_id, "/datasets/", dataset_id, "/files/", "data.json"),
add_headers(
`Content-Type` = "application/octet-stream",
`x-gw-ims-org-id` = org_id,
`x-sandbox-name` = sandbox,
`Authorization` = paste0("Bearer ", access_token),
`x-api-key` = api_key
),
body = upload_file(json_file)
)
### Close the batch
close_batch_response = POST(
url = paste0("https://platform.adobe.io/data/foundation/import/batches/", batch_id, "?action=COMPLETE"),
add_headers(
`x-gw-ims-org-id` = org_id,
`x-sandbox-name` = sandbox,
`Authorization` = paste0("Bearer ", access_token),
`x-api-key` = api_key
)
)
return(paste0("Success! Batch ID ", batch_id, " was created. There were ", sapply(json_file, peek_count_lines), " records in the batch."))
}
And that’s all there is to it! Once data has been successfully ingested, you should also see the new batch appear in the dataset UI:
Conclusion
Getting data into AEP from R can be extremely awesome – especially if you want your teams to be able to analyze the output of a model you build, add data to your CDP Unified Profile, or filter out bot traffic algorithmically. Additionally, using the cronR library makes it easy to schedule your models to run automatically and update AEP for you.
Hopefully, this is helpful – best of luck to you! Feel free to hit me up with questions on LinkedIn or Twitter!