1 /*
2  * Copyright (c) 2017-2019 sel-project
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in all
12  * copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20  * SOFTWARE.
21  *
22  */
23 /**
24  * Copyright: Copyright (c) 2017-2019 sel-project
25  * License: MIT
26  * Authors: Kripth
27  * Source: $(HTTP github.com/sel-project/selery/source/selery/node/handler.d, selery/node/handler.d)
28  */
29 module selery.node.handler;
30 
31 debug import core.thread : Thread;
32 
33 static import std.concurrency;
34 import std.conv : to;
35 import std.datetime : dur, msecs;
36 import std.socket;
37 import std.system : Endian;
38 import std.variant : Variant;
39 
40 import sel.net.modifiers : LengthPrefixedStream;
41 import sel.net.stream : TcpStream;
42 
43 alias HncomStream = LengthPrefixedStream!(uint, Endian.littleEndian);
44 
45 abstract class Handler {
46 	
47 	private static shared(Handler) n_shared_instance;
48 	
49 	public static nothrow @property @safe @nogc shared(Handler) sharedInstance() {
50 		return n_shared_instance;
51 	}
52 	
53 	public shared this() {
54 		n_shared_instance = this;
55 	}
56 
57 	/**
58 	 * Receives the next packet when there's one available.
59 	 * This action is blocking.
60 	 */
61 	public shared abstract ubyte[] receive();
62 
63 	/**
64 	 * Starts a new thread and send a new message to the server
65 	 * when a new packet arrives.
66 	 */
67 	public shared void receiveLoop(std.concurrency.Tid server) {
68 		debug Thread.getThis().name = "hncom_client";
69 		while(true) {
70 			std.concurrency.send(server, this.receive.idup);
71 		}
72 	}
73 
74 	/**
75 	 * Returns: the amount of bytes sent
76 	 */
77 	public shared synchronized abstract ptrdiff_t send(ubyte[] buffer);
78 
79 	/**
80 	 * Closes the connection with the hub.
81 	 */
82 	public shared abstract void close();
83 	
84 }
85 
86 class SocketHandler : Handler {
87 
88 	private HncomStream stream;
89 	
90 	private ubyte[] n_next;
91 	private size_t n_next_length = 0;
92 	
93 	public shared this(Address address) {
94 		super();
95 		Socket socket = new TcpSocket(address.addressFamily);
96 		socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
97 		//socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(5));
98 		//socket.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, dur!"seconds"(2));
99 		socket.blocking = true;
100 		socket.connect(address);
101 		this.stream = cast(shared)new HncomStream(new TcpStream(socket, 8192));
102 	}
103 
104 	public override shared ubyte[] receive() {
105 		return (cast()this.stream).receive();
106 	}
107 
108 	public shared synchronized override ptrdiff_t send(ubyte[] buffer) {
109 		return (cast()this.stream).send(buffer);
110 	}
111 
112 	public shared override void close() {
113 		(cast()this.stream.stream.socket).close();
114 	}
115 
116 }
117 
118 class TidAddress : UnknownAddress {
119 
120 	public std.concurrency.Tid tid;
121 
122 	public this(std.concurrency.Tid tid) {
123 		this.tid = tid;
124 	}
125 
126 	alias tid this;
127 
128 }
129 
130 class MessagePassingHandler : Handler {
131 
132 	public std.concurrency.Tid hub;
133 
134 	public shared this(shared std.concurrency.Tid hub) {
135 		super();
136 		this.hub = hub;
137 		std.concurrency.send(cast()hub, std.concurrency.thisTid);
138 	}
139 
140 	public shared override ubyte[] receive() {
141 		return std.concurrency.receiveOnly!(immutable(ubyte)[])().dup;
142 	}
143 
144 	public shared synchronized override ptrdiff_t send(ubyte[] buffer) {
145 		std.concurrency.send(cast()this.hub, buffer.idup);
146 		return buffer.length;
147 	}
148 
149 	public shared override void close() {}
150 
151 }