Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
ALERT: The support homepage carousel is not displaying. We are working toward a resolution.

Snowflake External Functions and Serverless Talend Jobs

No ratings
cancel
Showing results for 
Search instead for 
Did you mean: 
TalendSolutionExpert
Contributor II
Contributor II

Snowflake External Functions and Serverless Talend Jobs

Last Update:

Jan 22, 2024 9:35:30 PM

Updated By:

Jamie_Gregory

Created date:

Apr 1, 2021 6:00:21 AM

Snowflake is considered a leader in Cloud Data Warehousing platforms. In this article, I am not going to go into depth on why that is; however, what I do want to discuss is an exciting new feature that Snowflake has just recently made available called External Functions. External Functions are like User Defined Functions (UDF), but as the name implies they call functions outside of the Snowflake Data Warehouse. As of this writing Snowflake only supports AWS API Gateway, however I am sure that will grow quickly to other platforms. This is exciting because now you can extend your SQL Query to have a function that will pass a number of data elements to some other application and have the result as part of your returned result set.

In this article, we are going to show you how to set up your Talend Job, Lambda Function, AWS API Gateway, and Snowflake Data Warehouse. At the end, our query will call our newly created external function that will actually have the data processed by our Talend Job. Let's get started!

Content:

 

Create a Talend Job

In this example, our Talend Job will take an email address from the database and validate it against our Data Quality tVerifyEmail function. The first thing we need to understand is what the data format from Snowflake looks like, and that answer can be found on the Snowflake External Functions page. Our input and output formats must be of the same construct (i.e. a JSON object named “data” that is of type Array that contains an array of values. The first value will always be a zero based index). Here is an example of a data format request from Snowflake:

{
    "data": [
                [0, 10, "Alex", "Wed, 01 Jan 2014 16:00:00 -0800"],
                [1, 20, "Steve", "Wed, 01 Jan 2015 16:00:00 -0800"],
                [2, 30, "Alice", "Wed, 01 Jan 2016 16:00:00 -0800"],
                [3, 40, "Adrian", "Wed, 01 Jan 2017 16:00:00 -0800"]
            ]
}

Here is an example of a data format response back to Snowflake:

{
    "data":
        [
            [ 0, 1995 ],
            [ 1, 1974 ],
            [ 2, 1983 ],
            [ 3, 2001 ]
        ]
}

Let's have a look at our completed Talend Job, and then walk through each component step by step.

0693p000008uprwAAA.jpg

  1. tFixedFlowInput: This component is the input to our Talend Job. The first thing we do is create a Context Variable which we will call input, and use this in our tFixedFlowInput component as a column we will call inputRow to get the input sent by Snowflake.

    Context Variable

    0693p000008upySAAQ.jpg

    tFixedFlowInput

    0693p000008upyXAAQ.jpg

  2. tExtractJsonFields: This component will extract out the fields we need from the Snowflake input. For our example will be the following:

    {"data":[[0,"taylor49@adventure-works.com"],[1,"faith38@adventure-works.com"]]}

    0693p000008upycAAA.jpg

    Edit the schema and add the two fields index and email.

    In the component Basic Settings tab, for the Loop Jsonpath query add the following:

    • “$.data[*]”

    In the Mapping section, add the following:

    • Column: index – Json query: “@[0]”
    • Column: email – Json query: “@[1]”

  3. tVerifyEmail: This component will take the email address and validate it against the default regular expression, as well as Talend’s list of Top-Level Domains.

    Column to validate: Select email from the drop-down list.

    0693p000008upyhAAA.jpg

  4. tMap_2: This component will create a static value that we can use in the next component, tWriteJSONField, so that our response JSON will be in the correct format. The column call has the static value Talend; all other columns are simply mapped across.

    0693p000008upymAAA.jpg

  5. tWriteJSONField: This component will reassemble our JSON using the call column from the previous component to group all of the data elements correctly.

    On the Basic Settings tab:

    • Output Column: response
    • Group By: Input Column call
    • Remove root node: checked

      0693p000008upywAAA.jpg

    Configure JSON Tree: We will map only the index and the VerificationLevel from the input. The VerificationLevel will tell us if the email is VALID or INVALID.

    0693p000008upz6AAA.jpg

  6. tMap_1: The JSON payload sent by Snowflake does not contain any name/value pairs for any of the data elements. However, they do follow the model that the first value in the array is always the index, which correlates back to the record in Snowflake when returned. In our Job, we map this to the field index, and the second element in the array we call email.

    0693p000008upzGAAQ.jpg

    We have to remove the names (index and VerificationLevel) from the created response, as well as make the objects in the array into array structures themselves. Now our response is ready to go back. We create a column called var1 in this tMap that will hold the final structure.

    0693p000008upz7AAA.jpg

  7. tBufferOutput: This component simply takes the column from the previous component and sends it back to our calling application.

 

Create an AWS Environment

Now that we have created our Talend Job, we can now prepare the AWS Environment that will consist of a Lambda Function and an API Gateway. We will start with the Lambda Function, and then create an API Gateway endpoint to trigger our Lambda.

  1. First, we need to build the Talend Job. To do this, right-click on the Job and select Build Job.

    0693p000008upxQAAQ.jpg

  2. From the next screen, we will accept the defaults and click Finish.

    0693p000008upzQAAQ.jpg

  3. Next we will create a Java project to build our Lambda Function. You can clone this example from the following GitHub repository: https://github.com/Talend/CallTalendJob.

    Note: Please review the pom.xml file for instructions on some of the dependencies.

  4. Let's review the code. Open the TalendJobDQLambdaFuncHandler.

    0693p000008upzVAAQ.jpg

    1. First, we create a new instance of the Talend Job.

      final local_project.emailvalidation_0_1.emailValidation job = new local_project.emailvalidation_0_1.emailValidation();
    2. Next, we are going to take the request data sent from Snowflake and add it to a String array. Remember the context variable we created when we set up our Talend Job? Here we will set the value of that context variable.

      String[] var = new String[] { “—context_param input=”+requestString};
    3. Then we will execute the Talend Job and pass the String array in as a parameter. We will get back a multi-dimensional String array response that will contain a JSON payload.

      String[][] bufferOutput = job.runJob(var);
    4. We write the response back to the Lambda Function logs so we can verify it for debugging purposes.

      logger.log(bufferOutput[0][0]);
    5. We send the JSON payload into the generateResponse function, and return the apiGatewayProxyResponseEvent. This will then be handled by Snowflake.

      generateResponse(apiGatewayProxyResponseEvent, bufferOutput[0][0]);
  5. Build the JAR file for Lambda. From the command line and in the directory of the project, execute the following command:

     mvn clean install
  6. Now we need to create the Lambda Function in AWS, so log into your AWS Environment.
  7. Create a new Lambda Function:

    1. Function Name: callTalendDQJob
    2. Runtime: Java 8
    3. Click the Create Function button.

      0693p000008upzaAAA.jpg

       

  8. Configure our Lambda Function:

    1. Handler: com.talend.job.dq.TalendJobDQLambdaFuncHandler::handleRequest
    2. Upload Jar file: Select the CallTalendJob-1.0.0.jar file under the target directory from your build.
    3. Click Save to start the upload and save the function.

  9. Create the API Gateway Trigger:

    1. From the Lambda Function Designer, click Add trigger.

      0693p000008uprxAAA.jpg

    2. Next, select API Gateway from the drop-down list.

      0693p000008upzfAAA.jpg

    3. Now we configure the API Gateway. For this demo, select Open for Security, however you could select IAM if you wanted to.

      0693p000008upzkAAA.jpg

    4. Finally, our Lambda Function should look like the following:

      0693p000008upzpAAA.jpg

       

Set Up the Snowflake Environment

Now that we have the Talend Job, the AWS Lambda Function, and the API Gateway set up, we are ready to set up our Snowflake environment and run our query to validate our emails.

  1. First, we are going to need two pieces of information from our AWS environment.

    1. Our API Gateway Role ARN: We can get this by going to our IAM, clicking on the Roles section on the left, and then typing in the name of our Lambda Function callTalendDQJob. Click on the role and then copy the ARN value.

      0693p000008upzzAAA.jpg

      0693p000008uq04AAA.jpg

    2. Our API endpoint: From the Lambda Function, click on the API Gateway Trigger and expand the details. Now copy our API endpoint:

      0693p000008uq0EAAQ.jpg

  2. In your Snowflake Data Warehouse, run the following query:

    create or replace api integration tlnd_job_aws_gw
    API_PROVIDER=aws_api_gateway API_AWS_ROLE_ARN=<API Gateway Role ARN>' api_allowed_prefixes=(‘<API EndPoint>')ENABLED=TRUE;

    For the API endpoint, you can remove the last part callTalendDQJob, as we are simply telling Snowflake that we want to call any end point under the default/ path.

  3. Now that we have the API Integration created in Snowflake, we must now update our API Gateway Role ARN so that the Snowflake API Gateway and our API Gateway will communicate. We need to get two pieces of information from the now-created API Integration in Snowflake. To do this, execute the following query:

    desc integration tlnd_job_aws_gw;

    The result will look similar to this. We need to get the values for API_AWS_IAM_USER_ARN and API_AWS_EXTERNAL_ID:

    0693p000008uq0JAAQ.jpg

  4. Now go back to the API Gateway Role in your IAM, where we got the ARN, and click on the Trust relationships tab below. Click Edit trust relationships and paste the following. Replace the values that you captured from the previous step where marked below. Click Update Trust Policy.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        },
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<API_AWS_IAM_USER_ARN>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<API_AWS_EXTERNAL_ID>"
            }
          }
        }
      ]
    }
  5. Create the External Function by executing the following statement in your Snowflake environment, this time using the entire API endpoint URL (don't remove callTalendDQJob). We are setting the MAX_BATCH_ROWS=500, as this will guarantee that no more than 500 records will be sent over at any one time. However, you can tune this at your discretion, based on your Talend Job. I chose this to make sure it produced a good number of calls for this demo.

    create or replace external function talendjob_emailvalidation(string_col varchar)
        returns varchar
        IMMUTABLE
        api_integration = tlnd_job_aws_gw
        MAX_BATCH_ROWS=500
        as ‘<API EndPoint>’;
  6. Now we can finally execute a Snowflake query with our new External Function. In our example, I have a table with 134,348,800 records, and we are going to validate the emails in that table. We can do this by executing the following query:

    select talendjob_emailvalidation(emailaddress) from customers;

    This execution had the following performance: 174 seconds to process the entire 134,348,800, or 772,120 rows/second.

  7. If you wish to see the details of the executions, you can go to the Log Watch service in your AWS and view what was sent - the response as well as the execution time.

As more applications are now moving with a Cloud first approach, I believe we are now starting to see the coalescing of these platforms and services. As the developer or citizen integrator, you can actually spend your effort concentrating on two very important aspects: Time vs Money. How fast do you need your results, and how much are you willing to spend to achieve those results? As you have seen, you can now embed your Talend Job in a highly scalable infrastructure, and when joined with Snowflake, get concurrency execution of your Jobs that is boundless

Labels (2)
Version history
Last update:
‎2024-01-22 09:35 PM
Updated by: