What is Megaphone?

What is Megaphone?
The Megaphone project is about enhancing open source chat software. Specifically, the goal is to allow ejabberd to support 1,000,000 simultaneous users. See The Plan page for more details on how I plan to solve this problem. See the About this Blog page for more details on why I created this blog.

Tuesday, January 31, 2012

Testing

Previously...
  • I coded up the put_data function...again
  • I fixed some problems with remove_waiter and wrote the notify_wait function.
  • I finished coding up the megaphone module.

Now that everything is more or less coded up, it's time to create some tests for what I have.  My goal is to save some time by not having to spend lots of time debugging things - it is faster to either inspect or test code then it is to debug it.  

Looking at the megaphone module since it is the most recent I can see the following cases:
  • socket_type
  • start
  • main_loop
  • recv
  • send
  • get_data
  • put_data
  • update_table
  • remove_waiter
  • notify_waiter
  • parse_packet
  • parse_headers
  • receive_packet
  • shut_down
  • send_data
There is a point of diminishing returns for testing, so I am going to eliminate testing for the most trivial functions. This includes the following:
  • socket_type
  • start_module
  • send
  • receive_packet
I suppose I shouldn't find it odd that testing requires an amount of effort that is proportional to the amount of time that it takes to develop the software in the first place, but it wont stop me from complaining about it.

So the final list is:
  • start
  • main_loop
  • recv
  • get_data
  • put_data
  • update_table
  • remove_waiter
  • notify_waiter
  • parse_packet
  • parse_headers
  • shut_down
  • send_data
And in the spirit of procrastination I'll call it quits.  Next time, the first of the tests.

Monday, January 30, 2012

megaphone VII: Parsing Packets

Previously...
  • I coded up the put_data function
  • I coded up the put_data function...again
  • I fixed some problems with remove_waiter and wrote the notify_wait function.

After the amount of coding that I had to do for some of the other functions, the packet decoding stuff was kind of anti-climactic:


parse_packet(Data) ->
    {ok, Request, Remainder} = erlang:decode_packet(http, [], Data),
    parse_headers([Request], Remainder).

parse_headers(Headers, Data) ->
    case erlang:decode_packet(httph, [], Data) of
        { ok, http_eoh, Rest } -> { Headers ++ http_eoh, Rest };
        { ok, Header, Rest } -> parse_headers(Headers ++ Header, Rest)
    end.

Not having any error handling might have something to do with it, but cryptic messages filled with weird braces are the erlang way!  Seriously, I'm going to put a reminder in the parking lot to go back and do a better job at error handling for this.

The erlang decode_packet function expects the caller to maintain some state as to where the parse is at between calls.  Hence parse_packet makes the first call to decode_packet with the http argument.  Subsequent calls are made in parse_headers and the httph argument is used.

This loop continues until decode_packet returns http_eoh, at which point the only thing left is the actual body of the request.

Next time: some odds and ends.

megaphone VI: Waiting for Data

Previously...
  • I coded up the get_data function
  • I coded up the put_data function
  • I coded up the put_data function...again

While coding up the stuff to notify the waiter, I noticed that the code to get the waiter was using the wrong hash table.  Mind you, I knew that when I was making that post!  It was all a test to see if any alert readers would catch that problem.  

But nobody did.

I hope you have all learned an important lesson from this, and it is not that I am a spastic programmer.  Who makes tons of mistakes.  Yeah, well, anyways, here is the revised version of remove_waiter:

remove_waiter(ConnectionID, State) ->
    Table = State#megaphone_main_state.waiters,
    case dict:is_key(ConnectionID, Table) of
        false -> { undefined, State};
        true ->
            { Waiter, NewTable } = case dict:fetch(ConnectionID, Table) of
                [ PID | Rest ] ->
                    UpdatedTable = dict:store(ConnectionID, Rest, Table),
                    { PID, UpdatedTable};

                [] ->
                    UpdatedTable = dict:erase(ConnectionID, Table),
                    { undefined, UpdatedTable}
            end,
            NewState = State#megaphone_main_state{waiters = NewTable},
            { Waiter, NewState }
    end.

And here is the code for notify_waiter:

notify_waiter(Waiter, ConnectionID, State) ->
    Table = State#megaphone_main_state.connectionToResults,
    case dict:is_key(ConnectionID, Table) of
        false ->
            ?ERROR_MSG("could not find data that we were just asked to deliver!  Connection ID: ~p", [ConnectionID]),
            State;

        true ->
            case dict:fetch (ConnectionID, Table) of
                [ Head | Tail ] ->
                    Waiter ! { ok, Head },
                    NewTable = dict:store(ConnectionID, Tail, Table),
                    State#megaphone_main_state{connectionToResults = NewTable};

                [] ->
                    ?ERROR_MSG("Data we were supposed to deliver is missing for connection ID: ~p", [ConnectionID]),
                    NewTable = dict:erase(ConnectionID, Table),
                    State#megaphone_main_state{connectionToResults = NewTable}
            end
    end.

For next time, parsing HTTP packets.

Saturday, January 28, 2012

megaphone V: Data Incoming

Previously...
  • I coded up the interface functions for the megaphone module.
  • I coded up the get_data function
  • I coded up the put_data function

Showing my usual decisive nature, I decided to change my mind about some of the code I had written.  Specifically, the put_data function.  The new version looks like this:


put_data(State, ConnectionID, Data) ->
    NewState = update_table(State, ConnectionID, Data),
    { Result, NewerState } = remove_waiter(ConnectionID, State),
    case Result of
        undefined -> NewerState;


        Waiter ->
            notify_waiter(Waiter, ConnectionID, State)
    end.

Note that get_waiter_for has been replaced with remove_waiter.  In addition to ending with a noun instead of a preposition, the new function returns an updated version of the module state.  This is because it changes the state of the module.  By removing the first waiter from the list of waiters for the connection.  Hence the name.  Yeah.

While I was tempted to calling it quits for this post after changing a few lines in one source file, in a burst of creativity I also wrote the remove_waiter function:

remove_waiter(ConnectionID, State) ->
    Table = State#megaphone_main_state.connectionToResults,
    case dict:is_key(ConnectionID, Table) of
        false -> { undefined, State};
        true ->
            NewTable = case dict:fetch(ConnectionID, Table) of
                [ Waiter | Rest ] ->
                    dict:store(ConnectionID, Rest, Table);

                [] ->
                    dict:erase(ConnectionID, Table),
            end,
            NewState = State#megaphone_main_state{connectionToResults = NewTable},
            { Waiter, NewState }
    end.

I know, the excitement is enough to bowl you over.  I'm gonna go take a nap now.

Next time, more of the megaphone module internals.

Friday, January 27, 2012

megaphone IV: Receiving Data

Previously...
  • I created the main loop for the megaphone module.
  • I coded up the interface functions for the megaphone module.
  • I coded up the get_data function

This time I am going to talk about the function to process data that has been received for a virtual connection.  Interestingly enough, this logic is more complex than what I wrote for the client to retrieve a packet.  Here is the main entry point into this subsystem: the function to put a new packet into the system:


put_data(State, ConnectionID, Data) ->
    NewState = update_table(State, ConnectionID, Data),
    case get_waiter_for(ConnectionID, State) of
        undefined ->
            NewState;

        PID ->
            notify_waiter(PID, ConnectionID, State)
    end.

First the method adds the data to table of data (which I will detail in just a sec).  Next the function checks to see if anyone is waiting for data to arrive on that connection.  If so, then the waiter should be sent the data immediately.  


Here is the code for update_table:


update_table(State, ConnectionID, Data) ->
    List = parse_packet(Data),
    Table = State#megaphone_main_state.connectionToResults,
    Old = dict:fetch(ConnectionID, Table),
    NewList = Old ++ List,
    NewTable = dict:store(ConnectionID, NewList, Table),
    State#megaphone_main_state{connectionToResults = NewTable}.


This function updates the table that maps from connections to data.  The table stores a list of results as from erlang:decode_packet.  That function expects a different parameter depending on where in the packet one is at.    To ensure that the function is always called at the start of an HTTP packet, megaphone_receiver only sends complete packets onto the megaphone process.

Next time, more from the internals of the megaphone module.

Thursday, January 26, 2012

megaphone III: Getting Data

Previously...
  • I did some more planning for the megaphone module.
  • I created the main loop for the megaphone module.
  • I coded up the interface functions for the megaphone module.

The first part that I'm going to talk about is the function that is used when ejabberd wants to get new data from a connection.  The data itself is stored in an erlang dictionary as a list of results.  The table is indexed via connection ID.  Here is the code for the function:

get_data(State, From, ConnectionID) ->
    Table = State#megaphone_main_state.connectionToResults,
    case dict:is_key(ConnectionID, Table) of
        false ->
            State;

        true ->
            case dict:fetch(ConnectionID, Table) of
                [ Head | Rest ] ->
                    From ! { ok, Head },
                    Table2 = dict:store(ConnectionID, Rest, Table),
                    State#megaphone_main_state{connectionToResults = Table2};

                [] ->
                    Waiters = State#megaphone_main_state.waiters,
                    Waiters2 = dict:store(ConnectionID, From, Waiters),
                    State#megaphone_main_state{waiters = Waiters2}
            end
    end.


The dictionary module reacts violently if you ask for a key that is not present, hence the need for a call to dict:is_key before trying to access the data.  erlang is a little strange in the way it does if-then-else statements in that you are restricted in the conditional statement.  A case statement does not have this restriction, hence I often find myself using a case where I would normally use an if in a language like Java.  

Basically I'm too lazy to check to see whether an if statement would do, so I usually use case.

Sigh.

Anyhow, the contents of the table are lists of results that I get out of erlang:decode_packet.  If some data exists for the requested ConnectionID, then the function responds back to the requester with the data.  If the list is empty of results then the client needs to block waiting for data to arrive.  

Waiting is implemented by creating a callback table: when the data arrives the waiting process is sent a message with the data.  But the megaphone process needs some way of figuring out who to send messages to: hence the callback table.

I suppose I could use the callback table to also send annoying messages and musak akin to what human beings have to put with while on hold but I am basically a kindly person.

Next time, the code for when data comes in to the megaphone system.

Wednesday, January 25, 2012

megaphone II: Client Interface

Previously...
  • I started planning the megaphone module.
  • I did some more planning for the megaphone module.
  • I created the main loop for the megaphone module.

The main loop is all well and good but how do messages actually get to the megaphone process?  When a client is asking for data, the function needs to block until some data is available.  ejabberd also expects a certain interface, like having a function called "recv" for the BOSH stuff to call.  Here is my first go at the function:


recv(ConnectionID) ->
    megaphone ! { get_data, self(), ConnectionID },
    receive
        { ok, Result } ->
            { ok, Result };


        { error, Reason } ->
            { error, Reason };


        { shutdown } ->
            { error, shutdown }
    end.

The function to allow clients to send data does not have to worry about blocking and is much simpler:

send(ConnectionID, Data) ->
    megaphone ! { send_data, ConnectionID, Data }.

The function that megaphon_receiver uses to hand data off to ejabberd is also simple:

receive_packet(ConnectionID, Data) ->
    megaphone_sender ! {send, ConnectionID, Data}.

So much for the interface methods, next time I will go into the implementation of the main_loop methods.

Tuesday, January 24, 2012

megaphone I

Previously...
  • I created the sender.
  • I started planning the megaphone module.
  • I did some more planning for the megaphone module.

The start of the megaphone module needs to kick off the sender and the receiver:

start({gen_tcp, Socket}, _Opt) -> 
    megaphone_receiver:start(Socket),
    megaphone_sender:start(Socket),
    Pid = spawn(megaphone, start_module, []),
    register(megaphone, Pid).

Incidentally, I haven't tested any of this code so it probably doesn't work.  In fact, I'm counting on finding, revising and fixing the various bugs in the system to serve as the foundation for many tedious blog posts.  This is partially because I need something to write about but also because that is the way that real software happens.

Anyways, back to the module.  The start_module function just sets up the state for the new process:

start_module() ->
    State = #megaphone_main_state{connectionToResults = dict:new()},
    main_loop(State).

The main_loop function sits in a loop waiting for events to process.  Specifically, it waits for messages from the receiver for new packets and the clients asking for new packets:

main_loop(State) ->
    receive
        { get_data, From, ConnectionID } ->
            State2 = getData(State, From, ConnectionID),
            main_loop(State2);

        { put_data, ConnectionID, Data } ->
            State2 = putData(State, ConnectionID, Data),
            main_loop(State2);

        { send_data, ConnectionID, Data } ->
            State2 = sendData(State, ConnectionID, Data ),
            main_loop(State2);

        { shutdown } ->
            megaphone_receiver ! { shutdown },
            megaphone_sender ! { shutdown },
            ok
    end.

It is not clear to me if the logic deals with multiplexing outbound messages really belong in megaphone or in on the of sender or receiver module.  For right now the notion of having one interface that does everything is a nice goal to strive for, with the understanding that I can change my mind and use a different architecture if it appears that the newer approach is better.  We shall see.
  
I'm adding the shutdown message with an eye towards being able to gracefully shut down the megaphone module.  Note that, when propagating the shutdown message to the sender and the receiver, megaphone does not wait for those modules to actually complete a shutdown.  This is because there is no guarantee that these processes will actually be running (they may have crashed).  

I will probably add some sort of restart capability for those rare (HA) situations where one of the sub-modules have crashed and needs to be restarted.  I could also use the monitor capability.

The nice thing about being decisive is that you can make a decision over and over again.

Next time, the implementations of the various main_loop functions. 

Monday, January 23, 2012

The Main megaphone Module

Previously...
  • I finished the receiver.
  • I created the sender.
  • I started planning the megaphone module.

The main megaphone module, aside from being a phrase that has 3 words that start with the letter "m" and being difficult to say quickly, sits in a process by itself and mostly deals with incoming data.  

The basic problem is that there is no way I know of to both wait for data on a TCP socket while simultaneously waiting for an erlang message.  Clients want to wait for new data to arrive on the TCP socket that is bound for the virtual connection.  Those clients also want the data to be parsed out into chunks that one would get from erlang:receive_packet in http mode.  Thus clients have the following loop:
  • Get the next HTTP header or body, waiting if necessary.
  • Do something with the result.
A straight-forward way of solving this problem is to create a new process that waits for the next segment from the TCP connection and then have it notify any erlang processes that are waiting for data for the corresponding virtual connection.  In order to do this, the TCP waiter needs to know which processes are waiting for data from a given virtual connection.

That problem can be solved by having the TCP waiter wait for erlang messages that contain the erlang PID of the process that is interested in new data for each virtual connection. The server process simply waits in a loop for messages that contain registration requests.  This requires that the TCP waiter also wait for erlang messages, which I don't know how to do.

The solution I came up with is to have the TCP waiter in one process, called megaphone_receiver, and the function that registers clients that want data for a virtual connection in another process, called the megaphone process.  I admit that this is a somewhat cumbersome process, so I'm hoping that one of the many readers of this blog will comment on this post with a more elegant solution.  

So go ahead and write in.

Any time now.

Well, while I'm waiting I'll just start coding this up...

Sunday, January 22, 2012

What ejabberd Wants



Previously...
  • I created the part that receives headers.
  • I finished the receiver
  • I created the sender

The next part is the megaphone module itself.  This needs to do two major things:
  • Present a consistent interface to ejabberd_http and ejabberd_http_bind.
  • Buffer data.
The existing ejabberd BOSH module (ejabberd_http_bind and to some extent ejabberd_http) use a sort of generic socket module for communicating with the client.  The functions that I have found that need to be implemented are:
  • peername
  • recv
peername is used by ejabberd when determining "X-Forward-For."  It is not clear to me why ejabberd would care about which servers are forwarding packets on behalf of what other servers.  For the time being, I'm going to focus on getting megaphone to simply work, so this function is simply defined thusly:


peername(_Socket) ->         

    {ok, {{127, 0, 0, 1}, 1234}}.

The recv function ties in with data buffering.  Interestingly, the BOSH portion never seems to receive any data, instead ejabberd_http is the part that does this and the data is simply passed on to the BOSH part.

Next time, more on the ejabberd/megaphone interface.


Saturday, January 21, 2012

megaphone_sender

Previously...
  • I made some plans about how to create megaphone_tcp.
    • Determine if segment contains a complete header
      • If not, then keep reading until you do have a complete header
    • Parse out the connection ID and packet length, 
      • If you have a complete header, then determine if you have a complete megaphone packet
        • If not, then keep reading until you have a complete megaphone packet
    • Send the megaphone process the completed packet.
    • Wait for another packet to arrive.
  • I created the part that receives headers.
  • I finished the receiver

As it turned out, sending the packet off to the megaphone process was very simple.  Here is the code:


give_packet_to_megaphone (ConnectionID, Packet) ->
    megaphone ! { ConnectionID, Packet }.

Perhaps I need to work on the name a bit, but the function is ready.  At least for testing.

Emboldened by this, I went on and started writing the part of the system that sends packets to ECM.  In a burst of creativity I named this "megaphone_sender."  The startup for this module doesn't do much, just starts up a new process and then registers the PID as "megaphone_sender."  Here is the startup:


start(Socket, Config) ->
    State = #megaphone_state{socket = Socket, config = Config},
    Pid = spawn(megaphone_sender, main_loop, [State]),
    register(megaphone_send, Pid).

The main loop basically just sits around waiting for packets and then sends them off to ECM:

main_loop (State) ->
    receive
        { ConnectionID, Packet } ->
            send_packet(State#megaphone_state.socket, ConnectionID, Packet),
            main_loop(State);

        shutdown ->
            ok
    end.

The send_packet function is also pretty simple and just formats the packet:

send_packet(Socket, ConnectionID, Packet) ->
    PacketLength = length(Content) + 32,
    StrPacketLength = lists:flatten(io_lib:format("~10..0B", [ PacketLength ])),
    Packet = StrPacketLength ++ "|" ++ ConnectionId ++ "|" ++ Content,
    gen_tcp:send(Socket, Packet).

The next step is to refactor the megaphone module to take into account that the ECM communications logic has been moved to other modules.


Friday, January 20, 2012

Splitting megaphone_tcp

Previously...
  • I made some plans about how to create megaphone_tcp.
    • Determine if segment contains a complete header
      • If not, then keep reading until you do have a complete header
    • Parse out the connection ID and packet length, 
      • If you have a complete header, then determine if you have a complete megaphone packet
        • If not, then keep reading until you have a complete megaphone packet
    • Send the megaphone process the completed packet.
    • Wait for another packet to arrive.
  • I created receive_loop, which takes care of controlling the process.
  • I created the part that receives headers.

While writing the portion of megaphone_tcp that handles extracting the payload, it occurred to me that the name "megaphone_tcp" was something of a misnomer.  Rather than handling all things TCP, it really wanted to handle just those things relating to receiving a packet from TCP.  

Therefore, to further muddy the waters, I resolved to rename megaphone_tcp to megaphone_receiver.  At some point real soon now (ha!) I will create another module called megaphone_sender or megaphone_ecm to handle all things related to sending packets to ECM.

I modified the receive_header function so that it actually compiles.  It is basically the same, so I will not reprint it here.  Rather I will publish it along with the rest of the megaphone functionality on github.  Note that currently the megaphone-ejabberd account there does not have anything in it.

Here is the code for receiving the body of the message:

receive_packet_body (Socket, Data, Length) ->
    case byte_size(Data) of
        %%
        %% need more data to fill out the packet
        %%
        ContentLength when ContentLength < Length ->
            case gen_tcp:recv(Socket, 0) of
                {ok, NewData} ->
                    Data2 = Data ++ NewData,
                    receive_packet_body (Socket, Data2, Length)
            end;

        %%
        %% we have enough to fill out the packet
        %%
        true ->
            PacketData = binary_part(Data, 0, Length),

            %%
            %% Check for leftovers
            %%
            if
                byte_size(Data) == Length ->
                    { Data, undefined };
                true ->
                    LeftOvers = binary_part(Data, Length, byte_size(Data) - Length),
                    { PacketData, LeftOvers }
            end
    end.

For next time, the code that hands the packet off to the megaphone process.

Wednesday, January 18, 2012

receive_packet_headers

Previously...
  • I did an analysis of the event sequence that should take place in order to use a single process.
  • I made some plans about how to create megaphone_tcp.
    • Determine if segment contains a complete header
      • If not, then keep reading until you do have a complete header
    • Parse out the connection ID and packet length, 
      • If you have a complete header, then determine if you have a complete megaphone packet
        • If not, then keep reading until you have a complete megaphone packet
    • Send the megaphone process the completed packet.
    • Wait for another packet to arrive.
  • I created receive_loop, which takes care of controlling the process.
In today's report, I created receive_header


%
%% extract a complete megaphone header from the supplied data and from the 
%% supplied socket if the data is not enough.
%%
%% This function attempts to combine any leftover data with any newly received
%% data.  It then determines if it has enough data to form a complete megaphone
%% header.  If it does then it extracts out the fields from the data according
%% the standard locations and returns them.
%%
%% If the function does not have enough data to form a complete megaphone 
%% header, it will try to read more data from the supplied socket until it 
%% receives a complete packet.
%%
%% Socket --- The TCP socket that should be used to get more data.
%% LeftOvers --- Any data that should appear before the Data parameter.
%%     This value may be undefined if there was no left over data from the 
%%     previous packet.
%% Data --- Some newly received data from the socket.  This should appear after 
%%     the leftovers from the previous packet, if any exist.
%%
%% Returns { Length, CID, Content } where Length is an integer that is the 
%% number of bytes in the complete message, including the megaphone header.
%% CID is the connection ID of the connection, as a string (list).  Content
%% is the content, if any is left over from the header.  If there is no 
%% data left over from the header, then his value is returned as undefined.
%% 
receive_header(Socket, LeftOvers, Data) ->
    Data2 = case LeftOvers of
        undefined -> Data;
        true -> LeftOvers ++ Data
    end,
    case byte_size(Data2) of
        Length < ?PREFIX_LENGTH ->
            case gen_tcp:recv(Socket, 0) ->
                {ok, NewData},
                receive_header(Socket, Data2, NewData)
            end;

        true ->
            extract_CID_length_body(Data2)
    end.



This function takes create of detecting when the system has not received a complete packet and therefore needs to wait for more data from the client.  It uses the extract_CID_length_body function to extract out the data from the packet:

%%

%% extract out the packet length, the connection ID and the packet contents
%% from a binary block of data.
%%
%% Binary --- this should be the binary representation of a megaphone packet
%%    header.  The binary may also contain the partial or complete contents
%%    of a megaphone packet content.
%%
%% The function returns the tuple {Length, CID, Content} where Length is the 
%% length of the complete packet, include the megaphone header.  CID is the 
%% connection ID as an integer value.  Content is the megaphone packet contents.
%%
%% The input packet may contain no content or only part of the content of the 
%% packet and may therefore have a value of undefine or it may be a binary 
%% whose length is less than that of the complete content.
%%
extract_CID_length_body(Binary) ->
    BinaryPacketLength = binary_part(Binary, ?PACKET_LENGTH_START, ?PACKET_LENGTH_LENGTH),
    StringPacketLength = binary_to_list(BinaryPacketLength),
    { PacketLength, _ } = to_integer(StringPacketLength),

    BinaryConnectionID = binary_part(Binary, ?PACKET_CID_START, ?PACKET_CID_LENGTH),
    StringConnectionID = binary_to_list(BinaryConnectionID),

    BinaryContent = binary_part(Binary, ?PACKET_CONTENT_START, byte_size(Binary) - ?PACKET_HEADER_LENGTH),
    {PacketLength, StringConnectionID, BinaryContent}.

Next time, more functions that handle the extraction of packets.