Skip to content

Commit 6cbff78

Browse files
Added the mySparkStreamingApp.
1 parent 28b52e2 commit 6cbff78

4 files changed

Lines changed: 94 additions & 0 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.30717.126
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "mySparkStreamingApp", "mySparkStreamingApp\mySparkStreamingApp.csproj", "{774336F5-C408-40E3-9E45-BE2266E5AA31}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
Release|Any CPU = Release|Any CPU
12+
EndGlobalSection
13+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
14+
{774336F5-C408-40E3-9E45-BE2266E5AA31}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
15+
{774336F5-C408-40E3-9E45-BE2266E5AA31}.Debug|Any CPU.Build.0 = Debug|Any CPU
16+
{774336F5-C408-40E3-9E45-BE2266E5AA31}.Release|Any CPU.ActiveCfg = Release|Any CPU
17+
{774336F5-C408-40E3-9E45-BE2266E5AA31}.Release|Any CPU.Build.0 = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(SolutionProperties) = preSolution
20+
HideSolutionNode = FALSE
21+
EndGlobalSection
22+
GlobalSection(ExtensibilityGlobals) = postSolution
23+
SolutionGuid = {13246FAA-A546-4B4B-8E52-F6D65C265691}
24+
EndGlobalSection
25+
EndGlobal
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System;
2+
using Microsoft.Spark.Sql;
3+
using Microsoft.Spark.Sql.Streaming;
4+
using static Microsoft.Spark.Sql.Functions;
5+
6+
namespace mySparkStreamingApp
7+
{
8+
class Program
9+
{
10+
static void Main(string[] args)
11+
{
12+
//Console.WriteLine("Hello World!");
13+
// Default to running on localhost:9999
14+
string hostname = "localhost";
15+
int port = 9999;
16+
17+
// User designated their own host and port
18+
if (args.Length == 2)
19+
{
20+
hostname = args[0];
21+
port = int.Parse(args[1]);
22+
}
23+
24+
// Create Spark session
25+
SparkSession spark = SparkSession
26+
.Builder()
27+
.AppName("Streaming example with a UDF")
28+
.GetOrCreate();
29+
30+
// Create initial DataFrame
31+
DataFrame lines = spark
32+
.ReadStream()
33+
.Format("socket")
34+
.Option("host", hostname)
35+
.Option("port", port)
36+
.Load();
37+
38+
// UDF to produce an array
39+
// Array includes:
40+
// 1) original string
41+
// 2) original string + length of original string
42+
Func<Column, Column> udfArray =
43+
Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
44+
45+
// Explode array to rows
46+
DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
47+
48+
// Process and display each incoming line
49+
StreamingQuery query = arrayDF
50+
.WriteStream()
51+
.Format("console")
52+
.Start();
53+
54+
query.AwaitTermination();
55+
}
56+
}
57+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp3.1</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.Spark" Version="1.0.0" />
10+
</ItemGroup>
11+
12+
</Project>

Azure/DotNet4ApacheSpark/mySparkStreamingApp/text.txt

Whitespace-only changes.

0 commit comments

Comments
 (0)