GeoJson Operations in Apache Spark with Seahorse SDK

A few days ago we released Seahorse 1.4, an enhanced version of our machine learning, Big Data manipulation and data visualization product.

This release also comes with an SDK – a Scala toolkit for creating new custom operations to be used in Seahorse.

As a showcase, we will create a custom Geospatial operation with GeoJson and make a simple Seahorse workflow using it. This use-case is inspired by Example 8 from book Advanced Analytics with Spark.

Geospatial data in GeoJson format

GeoJson can encode many geographic data types. For example:

  • Location Point
  • Line
  • Region described by a polygon

GeoJson encodes Geospatial data using Json.

New York City Taxi Trips

In our workflow we will use a New York Taxi Trip dataset with pickup and drop-off location points.

Let’s say that we want to know how many Taxi Trips started on Manhattan and ended up in Bronx. To achieve this, we could use an operation filtering dataset by location contained inside some geographic region. Let’s call this operation ‘Filter inside Polygon’.

Manhattan GeoJson polygon Visualization

Manhattan GeoJson Polygon visualization using geojson.io

Bronx GeoJson polygon Visualization

Bronx GeoJson Polygon visualization using geojson.io

You can download GeoJson data with New York boroughs from https://github.com/dwillis/nyc-maps.

Implementing custom operation using Seahorse SDK steps:

  1. Clone Seahorse SDK Example Git Repository.
  2. Implement custom Geospatial operation.
  3. Assembly jar with custom Geospatial operation.
  4. Add assembled jar to Seahorse instance.
  5. Use custom Geospatial operation in a Workflow.

Seahorse SDK Example Git Repository

The fastest way to start developing a custom Seahorse operations is by cloning our SDK Example Git Repository and write your code from there.

The Seahorse SDK Example Repository has all Seahorse and Apache Spark dependencies already defined in an SBT build file definition.

Let’s add the geospatial data library to our dependencies in the build.sbt file:

Now we can implement the FilterInsidePolygon operation:

  1. We need to annotate the operation with @Register so that Seahorse knows that it should be registered in the operation catalogue.
  2. We extend DOperation1To1[DataFrame, DataFrame] because our operation takes one DataFrame as an input and returns one DataFrame as the output.
  3. UUID is used to uniquely identify this operation.
    After making changes in operation (e.g. name is changed), idshould not be changed – it is used by Seahorse to recognize that it’s the same operation as before.
  4. Parameter geoJsonPointColumnSelector will be used for selecting a column with Point Location data.

Parameter geoJsonPolygon represents a Polygon we will be checking location points against.

Now we can implement an execute method with actual GeoSpatial and Apache Spark dataframe code.

  1. We parse our polygon data using GeometryEngine class from esri-geometry-api library.
  2. Inside the Apache Spark dataFrame filter we use GeometryEngine class again to parse location points in each row.
  3. We use GeometryEngine class to test whether the point is contained inside the specified polygon.

Now that the implementation is finished, you can build a JAR and add it to Seahorse:

  1. Run sbt assembly. This produces a JAR in the target/scala-2.11 directory.
  2. Put this JAR in $SEAHORSE/jars, where $SEAHORSE is the directory with docker-compose.yml or Vagrantfile (depending whether you run Docker or Vagrant).
  3. Restart Seahorse (By either stoping and starting docker-compose or halting and uping vagrant).
  4. Operations are now visible and ready to use in Seahorse Workflow Editor.

And now that we have Filter Inside Polygon, we can start implementing our workflow:

NYC Taxi Trips Seahorse Workflow

First we need to specify our datasource. You can download the whole 26GB dataset from http://www.andresmh.com/nyctaxitrips.

Since we are using a Local Apache Spark Master we sample 100k rows from the whole dataset.

Additionally we also transformed the latitude and longitude column into one GeoJson column for both pickup and drop-off locations.

You can download the final preprocessed dataset used for this experiment from here.

Building our Workflow

Let’s start by defining our Datasource in Seahorse:

NYC Taxi Trips Datasource Parameters

Next, we use NYC borough GeoJson polygon values from https://github.com/dwillis/nyc-maps as attributes in our Filter Inside Polygon operations:

Filter Inside Polygon operation parameters

Finally, let’s run our workflow and see how many Taxi trips started on Manhattan and ended in Bronx:

NYC Taxi Trips Workflow Node Report

As we can see from our 100k data sample, only 383 trips started on Manhattan and ended in Bronx.

Summary

We cloned the Seahorse SDK Example repository and starting from it we implemented a custom Filter Inside Polygon operation using Scala. Then we built a JAR file with our operation and we added it to Seahorse. After that we built our Workflow and used custom Filter Inside Polygon operation working in Apache Spark dataframes with GeoJson data.

Links

0 replies

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *