Table of contents
Table of contents
In the latest Seahorse release we introduced the scheduling of Spark jobs. We will show you how to use it to regularly collect data and send reports generated from that data via email.
We would like to generate some graphs from this data every now and then and do it without any repeated effort.
For the historical weather data, we create a data source representing a file stored locally within Seahorse. Initially, it only contains a header and one row – current data.
Now we can create our first Spark job which reads both data sources, concatenates them and writes back to the “Historical weather data” data source. In this way, there will be a new data row in “Historical weather data” every time the workflow is run.
There is one more step for us to do – this Spark job needs to be scheduled. It is run every half an hour and after every scheduled job execution an email is sent.
In the email there is a link to that workflow, where we are able to see node reports from the execution.
In a notebook, first we transform the data a little:
Finally, we can set up a schedule for our report. It is similar to the previous one, only sent less often– we send a report every day in the evening. After each execution two emails are sent – one, as previously, with a link to the executed workflow and the second one containing a notebook execution result.
… and graphs that are in the executed notebook:
Use case
Let’s say that we have a local meteo station and the data from this station is uploaded automatically to Google Sheets. That is, it contains only one row (i.e. row number 2) of data which always contains current weather characteristics.data:image/s3,"s3://crabby-images/53ac2/53ac23e7635338a4afd72de20f1e1fb7997ba1ce" alt="There is one header row and one row with data: temperature, humidity, atmospheric pressure, wind speed and time."
Solution
Collecting data
The first workflow that we create collects a snapshot of live data and appends it to a historical data set. It will be run at regular intervals. Let’s start by creating two data sources in Seahorse. The first data source represents the Google Spreadsheet with live-updated weather data:data:image/s3,"s3://crabby-images/e703e/e703edd2207290391f0bdc314ec0c83b1360f211" alt="Seahorse's Google Spreadsheet data source options."
data:image/s3,"s3://crabby-images/9dfb4/9dfb49a49edccfa8a1ecfc9a8be1f99bcd5b1327" alt="Data source named "Historical weather data", which has its source in a library CSV file."
data:image/s3,"s3://crabby-images/b2354/b2354fd13688ae25740f377b0c28ca2fbaebd5f8" alt="Screenshot shows a workflow with four nodes, which will run as a scheduled job. At the top, there are two "Read DataFrame" operations named "Historical weather data" and "Live weather data". Their ouput is connected to "Union" operation named "Append live data snapshot". Its output is fed to "Write DataFrame" operation, writing to "Historical weather data". This is one of the workflow to be scheduled using Spark job scheduling."
data:image/s3,"s3://crabby-images/0e07c/0e07cd5bffc5dc31c78320b8014ea58d76ab8302" alt="Form for selecting scheduling options: "Select cluster preset: default Run workflow every hour at 05,35 minute(s) past the hour Send email reports to: execution-reports@example.com""
Sending a weather report
The next workflow is very simple – it consists of a Read DataFrame operation and a Python Notebook. It is done separately from collecting the data because we want to send reports at a much lower rate.data:image/s3,"s3://crabby-images/cb227/cb2272e2303e93d99b06fe5333959f84c25e7467" alt="Workflow consisting of two operations - "Read DataFrame", reading from "Historical weather data" and connected to it "Python Notebook"."
import pandas
import numpy
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
data = dataframe().toPandas()
data['time'] = pandas.to_datetime(data['time'])
data = data.sort('time')
import pandas
import numpy
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
data = dataframe().toPandas()
data['time'] = pandas.to_datetime(data['time'])
data = data.sort('time')
import pandas import numpy import matplotlib import matplotlib.pyplot as plt %matplotlib inline data = dataframe().toPandas() data['time'] = pandas.to_datetime(data['time']) data = data.sort('time')Next, we prepare a function that will be used to line-plot a time series of weather characteristics like temperature, air pressure and humidity:
def plot(col_to_plot):
ax = plt.axes()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.get_xaxis().tick_bottom()
ax.get_yaxis().tick_left()
y_formatter = matplotlib.ticker.ScalarFormatter(useOffset=False)
ax.yaxis.set_major_formatter(y_formatter)
plt.locator_params(nbins=5, axes='x')
plt.locator_params(nbins=5, axes='y')
plt.yticks(fontsize=14)
plt.xticks(fontsize=14)
plt.plot(data['time'], data[col_to_plot])
plt.gcf().autofmt_xdate()
plt.show()
def plot(col_to_plot):
ax = plt.axes()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.get_xaxis().tick_bottom()
ax.get_yaxis().tick_left()
y_formatter = matplotlib.ticker.ScalarFormatter(useOffset=False)
ax.yaxis.set_major_formatter(y_formatter)
plt.locator_params(nbins=5, axes='x')
plt.locator_params(nbins=5, axes='y')
plt.yticks(fontsize=14)
plt.xticks(fontsize=14)
plt.plot(data['time'], data[col_to_plot])
plt.gcf().autofmt_xdate()
plt.show()
def plot(col_to_plot): ax = plt.axes() ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.get_xaxis().tick_bottom() ax.get_yaxis().tick_left() y_formatter = matplotlib.ticker.ScalarFormatter(useOffset=False) ax.yaxis.set_major_formatter(y_formatter) plt.locator_params(nbins=5, axes='x') plt.locator_params(nbins=5, axes='y') plt.yticks(fontsize=14) plt.xticks(fontsize=14) plt.plot(data['time'], data[col_to_plot]) plt.gcf().autofmt_xdate() plt.show()After editing the notebook, we set up its parameters so that reports are sent to our email address after each run.
data:image/s3,"s3://crabby-images/1b03a/1b03aaf45d7490808222092139d6ba8f14f5fda9" alt="Options of Python Notebook. Execute notebook: true. Send E-mail report: true. Email Address: reports@example.com."
Result
After some time, we have enough data to plot it. Here is a sample from the “Historical weather data” data source…data:image/s3,"s3://crabby-images/dec1e/dec1e870a8ebbd0c03cce378c2cacfe869b94c0b" alt="Data report for "Historical weather data" Read DataFrame collected using Spark job scheduling. There is a table: columns are "temperature", "humidity", "atmospheric pressure", "wind speed" and "time"; rows contain some sample data."
data:image/s3,"s3://crabby-images/95e31/95e312f39a7bab9e587f9f0f42f4119d3fd131ca" alt="Two cells in Python Notebook, with inputs: "plot('temperature')" and "plot('atmospheric pressure')". Both outputs are images with line graphs."