1 /* 2 * Copyright (c) 2017-2018 sel-project 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining a copy 5 * of this software and associated documentation files (the "Software"), to deal 6 * in the Software without restriction, including without limitation the rights 7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 * copies of the Software, and to permit persons to whom the Software is 9 * furnished to do so, subject to the following conditions: 10 * 11 * The above copyright notice and this permission notice shall be included in all 12 * copies or substantial portions of the Software. 13 * 14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 20 * SOFTWARE. 21 * 22 */ 23 /** 24 * Copyright: 2017-2018 sel-project 25 * License: MIT 26 * Authors: Kripth 27 * Source: $(HTTP github.com/sel-project/selery/source/selery/node/handler.d, selery/node/handler.d) 28 */ 29 module selery.node.handler; 30 31 debug import core.thread : Thread; 32 33 static import std.concurrency; 34 import std.conv : to; 35 import std.datetime : dur, msecs; 36 import std.socket; 37 import std.system : Endian; 38 import std.variant : Variant; 39 40 import sel.net.modifiers : LengthPrefixedStream; 41 import sel.net.stream : TcpStream; 42 43 alias HncomStream = LengthPrefixedStream!(uint, Endian.littleEndian); 44 45 abstract class Handler { 46 47 private static shared(Handler) n_shared_instance; 48 49 public static nothrow @property @safe @nogc shared(Handler) sharedInstance() { 50 return n_shared_instance; 51 } 52 53 public shared this() { 54 n_shared_instance = this; 55 } 56 57 /** 58 * Receives the next packet when there's one available. 59 * This action is blocking. 60 */ 61 public shared abstract ubyte[] receive(); 62 63 /** 64 * Starts a new thread and send a new message to the server 65 * when a new packet arrives. 66 */ 67 public shared void receiveLoop(std.concurrency.Tid server) { 68 debug Thread.getThis().name = "hncom_client"; 69 while(true) { 70 std.concurrency.send(server, this.receive.idup); 71 } 72 } 73 74 /** 75 * Returns: the amount of bytes sent 76 */ 77 public shared synchronized abstract ptrdiff_t send(ubyte[] buffer); 78 79 /** 80 * Closes the connection with the hub. 81 */ 82 public shared abstract void close(); 83 84 } 85 86 class SocketHandler : Handler { 87 88 private HncomStream stream; 89 90 private ubyte[] n_next; 91 private size_t n_next_length = 0; 92 93 public shared this(Address address) { 94 super(); 95 Socket socket = new TcpSocket(address.addressFamily); 96 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 97 //socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(5)); 98 //socket.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, dur!"seconds"(2)); 99 socket.blocking = true; 100 socket.connect(address); 101 this.stream = cast(shared)new HncomStream(new TcpStream(socket, 8192)); 102 } 103 104 public override shared ubyte[] receive() { 105 return (cast()this.stream).receive(); 106 } 107 108 public shared synchronized override ptrdiff_t send(ubyte[] buffer) { 109 return (cast()this.stream).send(buffer); 110 } 111 112 public shared override void close() { 113 (cast()this.stream.stream.socket).close(); 114 } 115 116 } 117 118 class TidAddress : UnknownAddress { 119 120 public std.concurrency.Tid tid; 121 122 public this(std.concurrency.Tid tid) { 123 this.tid = tid; 124 } 125 126 alias tid this; 127 128 } 129 130 class MessagePassingHandler : Handler { 131 132 public std.concurrency.Tid hub; 133 134 public shared this(shared std.concurrency.Tid hub) { 135 super(); 136 this.hub = hub; 137 std.concurrency.send(cast()hub, std.concurrency.thisTid); 138 } 139 140 public shared override ubyte[] receive() { 141 return std.concurrency.receiveOnly!(immutable(ubyte)[])().dup; 142 } 143 144 public shared synchronized override ptrdiff_t send(ubyte[] buffer) { 145 std.concurrency.send(cast()this.hub, buffer.idup); 146 return buffer.length; 147 } 148 149 public shared override void close() {} 150 151 }