
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Sending Slack messages with Talend, part 2: Collecting your Slack data
Jan 31, 2024 9:03:47 AM
Feb 7, 2022 12:17:35 PM
In part one of this series, you looked at preparing Slack to allow a developer to use its APIs. There wasn’t much Talend development involved (none, apart from a routine that you might find useful for encryption/decryption), but hopefully those of you who are familiar with Talend API Services or ESB products have already jumped ahead to try out the Slack APIs. The next part of the process is a Job to collect user and channel data from the Slack API.
Content:
Retrieving key Slack information to send messages
Now that you have the Slack token you acquired in the first blog, you can’t just jump into sending messages. You have some other work to do first.
Unfortunately, Slack will not allow you to simply send messages by username or channel name. You need to find the channel ID for channels, and you need to open (or resume) a conversation for direct messages to individuals. This process will pass you a channel ID for your direct message conversation.
To retrieve this information and to open conversations, I have put together a small Talend Job that runs before I want to send messages. Once a channel ID has been retrieved or a conversation has been initiated, the channel ID will not change, so this Job does not have to be run every time you want to send messages. It needs to run once at the beginning of the project, then every time users or channels are added, so I run it once a day to ensure that we are not missing any data when we want to send messages.
The Job looks like this:
This Job also requires a context variable to be configured. For this version, I have set the context variable in the Contexts tab as shown below:
The SlackOAuthAccessToken context variable value is the token you created in the previous blog. In this example, I am adding it in the Contexts tab, but it should probably be stored elsewhere in your Job in an encrypted state, then decrypted when it’s used in your Job. In the previous blog I also gave an example routine to handle the encryption.
Once you have the context variable for this Job sorted, you are ready to move on to building the Job. Since there are a lot of components in this Job, I am not going to go into great detail on all of them. Some will need more detail than others and some (like the tLogRow components) need no explanation.
Each component I discuss will be linked to its number in the screenshot. Some components do exactly the same thing in different places, the descriptions of those may be written in the same place.
1 | tJava (set up variable) | This component initiates any variables that are needed for the first part of the Job. You will use a globalMap value to store a cursor that identifies the position of a set of paged records from the Slack API. This is initialized here:
//Set next_cursor globalMap globalMap.put("next_cursor", ""); |
||||||||||
2, 12 | tLoop | These components do exactly the same job and are configured in exactly the same way. They essentially iterate over API calls until there is no longer a next_cursor value returned. The configuration is set as follows:
Loop Type: While Declaration: int i=0 Condition: ((String)globalMap.get("next_cursor"))!=null Iteration: i++ |
||||||||||
3, 13 | tJava | These components do the same job. They build the URL that will be fired by the tRestClient components at 4 and 14. The code for 3 can be seen below. It is building a URL for this API method (https://api.slack.com/methods/users.list): if(((Integer)globalMap.get("tLoop_1_CURRENT_ITERATION")).intValue()==0){ globalMap.put("url", "https://slack.com/api/users.list"); }else{ globalMap.put("url", "https://slack.com/api/users.list?cursor="+((String)globalMap.get("next_cursor"))); }The code for 13 is shown below. It is building a URL for this API method (https://api.slack.com/methods/conversations.list): if(((Integer)globalMap.get("tLoop_2_CURRENT_ITERATION")).intValue()==0){ globalMap.put("url", "https://slack.com/api/conversations.list?types=public_channel%2Cprivate_channel"); }else{ globalMap.put("url", "https://slack.com/api/conversations.list?types=public_channel%2Cprivate_channel& cursor="+((String)globalMap.get("next_cursor"))); } |
||||||||||
4, 14 | tRestClient | These components are configured in exactly the same way: they make the API call and return the response. Below you can see how the Basic settings are configured: The Advanced settings are configured as follows: Note the HTTP Headers that are set. The Authorization header is where your Slack Access Token will go, preceded by “Bearer “. Notice that I have used a context variable for this; it can be configured however you wish, so long as it is there. |
||||||||||
6, 16 | tExtractJsonFields | These components are used to extract the next_cursor value from the JSON returned by the tRestClient components. If no value is returned, it means that this is the last page. If a value is returned, it means another page is expected and the next_cursor value is used in the next API call. These components have two columns in their schemas: string and next_cursor. They are configured as shown below: The JSON Query for the next_cursor column is "response_metadata.next_cursor". |
||||||||||
7, 17 | tJavaFlex | These components take the values returned by the tExtractJSONFields components preceding them, pass on the string column (which holds the JSON from the API call) and update the next_cursor globalMap value. You can see the code below:
if(row9.next_cursor!=null && row9.next_cursor.trim().compareToIgnoreCase("")!=0){ globalMap.put("next_cursor", row9.next_cursor); }else{ globalMap.put("next_cursor", null); }Remember to select the Data Auto Propagate check box on the Basic settings tab. |
||||||||||
8, 18 | tExtractJSONFields | These components retrieve the user and channel data from the JSON retrieved by the tRestClient components. They are essentially configured in the same way, other than the Loop Jsonpath query field. For component 8, this field is set to: "$.members[*]"For component 18, this field is set to: "$.channels[*]" |
||||||||||
9, 19 | tMap | These tMap components prepare the schema for the data to be passed to the tHashOutput components that follow. The schema is shown below:
The mapping can be seen below. There is a subtle difference that occurs in the type output: For component 9 this value is “USER”, for component 19 it is “CHANNEL”. This allows us to keep the data in the same recordset while being able to identify the type of record. |
||||||||||
10, 20 | tHashOutput | These components are easily configured, as they take the schema of the previous components. However, they are subtly different in how they are configured so that they can work together. Component 10 is the tHashOutput_1 and component 20 is the tHashOutput_2. The configuration of tHashOutput_1 is below. Notice that the Append check box is selected. The configuration of tHashOutput_2 is below. For this component, the Link with a tHashOutput check box is selected. This reveals the Component list dropdown, where tHashOutput_1 is selected. The two components are configured to store the different flows of data together. |
||||||||||
21 | tHashInput | This component is used to read in the data collected in the two previous subJobs. The schema is copied from tHashOutput_1 and tHashOutput_1 is selected from the Component list drop-down list. |
||||||||||
22 | tMap | This component is used to filter out users and channels that already exist in our Slack_Data dataset. Our Slack_Data dataset is covered by the component 23 description. Only the records that do not already exist are passed through. The mapping is shown below. Pay attention to the Join Model and the Catch lookup inner join reject setting. This only outputs records from the main input that are not matched. The output expressions can be seen below:
The channel ID for channels is the same as the channel’s ID. It does not need a further lookup, whereas users do need this further lookup. |
||||||||||
23,25 | tDBInput (Snowflake) | This component is used to query our Slack_Data dataset. This example shows a Snowflake database, but you can use whatever database you wish. You can even modify this example to use a flat file data source, although you would not be able to update that and would have to recreate the complete dataset each time. The schema of the Slack_Data table is shown below: create or replace TABLE TALENDCDW.TCOMMUNITY.SLACK_DATA ( ID VARCHAR(25) NOT NULL, NAME VARCHAR(256) NOT NULL, TYPE VARCHAR(25) NOT NULL, CHANNELID VARCHAR(25), primary key (ID) ); The tDBInput component is used to query the table above. It returns the complete dataset, so doesn't apply filters or a WHERE CLAUSE. |
||||||||||
24 | tDBOutput (Snowflake) | This component is used to insert the data. You can see the configuration below: Blacked out parameters should be set as context variables. Change the Output Action from UPSERT to INSERT to increase efficiency. |
||||||||||
26 | tMap | This component is used to filter the records so that only users that do not already have a channel ID are selected to be processed. The tMap takes the data from the tHashInput described above and joins it to data that you have previously collected (see component 23). Any users that do not already have a channel ID are passed to the next component in the flow. The mapping is below: Notice the configuration of the join. It is an Inner Join between the Slack Name from our main dataset and the Slack column from our lookup source. The format is @{Slack Name} for users, and #{Channel Name} for channels. The output filter is as follows: row16.CHANNELID== null && row16.TYPE.compareToIgnoreCase("USER")==0 |
||||||||||
27 | tDBInput (Snowflake) | This component is used to query which Slack users you need to get further data for. For example, this could be a flat file with a list of Slack users you want a channel ID for, it could be a database table, anything you like. In this case I am using data from a Snowflake database table to return a list of Slack names. The tMap described above explains how these are used. | ||||||||||
29 | tFlowToIterate | This component is used to iterate over the data returned by the tMap. Each row is stored in a series of globalMap key/value pairs. Each row is used to query the Slack API in the next section. The iterate link means that every component after this one is fired between each iteration. | ||||||||||
30 | tJava | This component is used to create the required JSON to supply parameters to the Slack API. The JSON supplies the user ID for the Slack user you want more information on. The code is as follows:
String post = "{\r\n\t\"users\": \""+((String)globalMap.get("row18.ID"))+"\"\r\n}"; globalMap.put("post", post); |
||||||||||
31 | tFixedFlowInput | This component is used to supply the JSON payload to the tRestClient component that is calling the Slack API. The component is configured as follows: Notice that a schema of two columns is configured. One is a null body column that is configured as a Document, and the other is a string column that is populated with the following code: ((String)globalMap.get("post"))This retrieves the JSON that was created in the previous tJava component. |
||||||||||
32 | tRestClient | This component is used to call the Slack API. It is configured in a similar way to components 4 and 14, but it is sending a POST request instead of a GET request. The Basic settings are configured as follows:
|
||||||||||
34 | tExtractJSONFields | This component is used to retrieve the channel ID from the returned JSON. You need to create a String id column and configure the Mapping section:
|
||||||||||
35 | tMap | This component takes the output from the tExtractJSONFields component and adds values that have been held in the globalMap HashMap from the tFlowToIterate component (component 29). The mapping is as follows: The mapping values can be copied from the table below, but remember that row names are not guaranteed to be the same:
|
||||||||||
36 | tDBOutput (Snowflake) | This component is used to update the users that already exist in this database table but whose channel ID values are missing. The configuration of this component is shown below: Notice that the Output Action is set to UPDATE. The schema for this table is described with components 23 and 25. |
After you have created this Job to collect Slack data, you have all the information you need to start sending Slack messages to users and channels.
Running the Job
This Job is required to gather the user and channel ID data to allow you to send messages to the entities. It doesn't need to be run for every message sent, but must be run when new users and/or channels are created. Therefore, you should run this Job once per day. This Job can be run in Talend Studio or on Talend Cloud. All you need is to set the SlackOAuthAccessToken context variable, then run the Job. When the Job is finished running, your target data repository will be populated with all of your user and channel data. This example uses a Snowflake table for the data repository, but you can use whatever you prefer.
Next
Continue to Part 3: Start sending Slack messages with Talend