6

このHaskellサーバーに単純なUDPパケットを送信しています。パケットのソースには、「aspell -lendumpmaster」によって生成されたプレーンテキストファイルを使用します。ただし、120,000を超えるメッセージのリストはすべて機能するはずです。コンシューマーとプロデューサーを同時に起動しても、パケットを失うことはありません。ただし、非常に忙しい消費者をシミュレートできるようにしたいと思います。コンシューマーを開始する前にthreadDelayを20秒間導入すると、パケット損失が発生します。消費を遅らせると、標準の出力とディスクIOをあまり使用しないため、これは直感に反します。遅延バージョンで損失が発生する理由を誰かが説明できますか?消費者が非常に忙しいときに損失(メモリ使用量が増えるだけ)が発生しないように、ソケットとTChanをより適切に管理するにはどうすればよいですか?

import Control.Monad (forever)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (writeTChan, readTChan, atomically)
import Control.Concurrent.STM.TChan
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.ByteString hiding(putStrLn, head)
import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr)
import System.IO

main :: IO ()
main = withSocketsDo $  do
    hSetBuffering stdout NoBuffering
    addrinfos <- getAddrInfo
                 (Just (defaultHints {addrFlags = [AI_PASSIVE]}))
                 Nothing (Just "2000")
    let serveraddr = head addrinfos
    sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
    bindSocket sock (addrAddress serveraddr)
    chan <- newTChanIO
    forkIO(producer chan sock)
    -- Uncomment the threadDelay below to see lossy version
    -- threadDelay (1000000 * 20)
    forkIO(consumer chan)
    forever $ threadDelay (1000000 * 60)

producer :: TChan ByteString -> Socket -> IO ()
producer chan sock = forever $ do
    (msg) <- recv sock 256
    atomically $ writeTChan chan msg

consumer :: TChan ByteString -> IO ()
consumer chan = forever $ do
    msg <- atomically $ readTChan chan
    Char8.putStr msg
4

1 に答える 1

1

データの損失を予期しないのはなぜですか? これは、Haskell だけでなく、どの言語でも発生します。これは、カーネルが各ソケットに対して限られた量のバッファー スペースしか持たないためです。TCP とは異なり、UDP はフロー制御されていないため、カーネルはパケットをドロップするだけです。もちろん、SO_RCVBUF ソケット オプションを使用してバッファ サイズを増やすこともできます。しかし基本的に、UDP はとにかく信頼できるものではないため、それを使用する実際のアプリケーションはパケット損失を処理する必要があります。

于 2015-08-02T23:46:43.150 に答える